In [4]:
import findspark
findspark.init('C:\spark-3.4.1-bin-hadoop3')

import pyspark
from delta import *


builder = pyspark.sql.SparkSession.builder.appName("globant_test") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-azure:3.3.6,com.microsoft.azure:azure-storage:8.6.6") \
    .config("spark.sql.sources.partitionOverwriteMode","dynamic")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

In [51]:
import configparser
config = configparser.RawConfigParser()
config.read('config.ini')

account_key = config['azure']['accountKey']
account_name = config['azure']['accountName']
raw_container = config['azure']['rawContainer']
silver_container = config['azure']['silverContainer']

spark._jsc.hadoopConfiguration().set(f"fs.azure.account.key.{account_name}.dfs.core.windows.net", account_key)

raw_fs = f'abfss://{raw_container}@{account_name}.dfs.core.windows.net'
hmz_fs = f'abfss://{silver_container}@{account_name}.dfs.core.windows.net'
file_path = 'globant'

In [54]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType

hired_employees_schema = \
              StructType([ \
                        StructField('id', IntegerType(), True), \
                        StructField('name', StringType(), True), \
                        StructField('datetime', DateType(), True), \
                        StructField('department_id', IntegerType(), True), \
                        StructField('job_id', IntegerType(), True) \
                        ]) 

departments_schema = \
              StructType([ \
                        StructField('id', IntegerType(), True), \
                        StructField('department', StringType(), True) \
                        ])

jobs_schema = StructType([ \
                        StructField('id', IntegerType(), True), \
                        StructField('job', StringType(), True) \
                        ])

config = {'hired_employees': hired_employees_schema, 'departments': departments_schema, 'jobs': jobs_schema}

In [56]:
df_ = {}
for table, schema in config.items():
    df_[table] = spark.read.csv(f'{raw_fs}/{file_path}/{table}.csv', header=False, schema=schema)


In [71]:
df_delta_ = {}
for table in config:
    if DeltaTable.isDeltaTable(spark, f'{hmz_fs}/{file_path}/{table}_delta'):
        df_delta_[table] = DeltaTable.forPath(spark, f'{hmz_fs}/{file_path}/{table}_delta')
        df_delta_[table].alias('current').merge( \
            df_[table].alias('new'), \
            condition="current.id  = new.id" \
        ) \
        .whenMatchedUpdateAll() \
        .whenNotMatchedInsertAll() \
        .execute()
        
        df_delta_[table].optimize().executeCompaction()
    else:
        df_[table].write.format("delta").mode("overwrite").save(f'{hmz_fs}/{file_path}/{table}_delta')

In [72]:
spark.stop()

183