In [0]:
from pyspark.sql import SparkSession
from delta.tables import *
import os

# Initialize Spark Session
spark = SparkSession.builder.appName("Data Ingestion").getOrCreate()

spark.conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
spark.conf.set("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")

# Path to the service account JSON key in DBFS
service_account_path = "/dbfs/FileStore/shared_uploads/auth/noob2_bootcamp_b22d638cc5c5.json"

# Configure Spark to use the service account JSON key for GCS authentication
spark.conf.set("fs.gs.auth.service.account.json.keyfile", service_account_path)


# GCS bucket details
bucket_name = "order_tracking_new"
data_directory = f"gs://{bucket_name}/input/"
archive_directory = f"gs://{bucket_name}/archive/"

# Define the path for the staging Delta table
staging_table_path = "dbfs:/user/hive/warehouse/staging_order_tracking"

# Read all CSV files from the specified GCS directory
df = spark.read.csv(data_directory, inferSchema=True, header=True)

df.show()

print("Create Stage Table")
# Check if the Delta table exists
if DeltaTable.isDeltaTable(spark, staging_table_path):
    # If table exists, overwrite it
    df.write.format("delta").mode("overwrite").save(staging_table_path)
else:
    # If not, create the table
    df.write.format("delta").mode("append").save(staging_table_path)

print("Data Inserted In Stage Table")

# Create a table in the Hive Metastore
spark.sql(f"CREATE TABLE IF NOT EXISTS staging_order_tracking USING DELTA LOCATION '{staging_table_path}'")

# List and move files individually
file_list = dbutils.fs.ls(data_directory)
for file in file_list:
    if file.name.endswith(".csv"):
        print(f"{file} Moved in archive folder")
        dbutils.fs.mv(file.path, os.path.join(archive_directory, file.name))

+---------+------------+-----------------+--------------------+----------------+-------------+---------------------+
|order_num|tracking_num|pck_recieved_date|package_deliver_date|          status|      address|last_update_timestamp|
+---------+------------+-----------------+--------------------+----------------+-------------+---------------------+
|     1000|     TRK1000|       2023-01-01|          2023-01-06|        Returned|   456 Oak St|  2023-01-01 05:41:55|
|     1001|     TRK1001|       2023-01-01|          2023-01-06|      In Transit|  789 Pine St|  2023-01-01 08:43:50|
|     1002|     TRK1002|       2023-01-01|          2023-01-05|Out for Delivery|   123 Elm St|  2023-01-01 17:44:29|
|     1003|     TRK1003|       2023-01-01|          2023-01-04|      In Transit|  789 Pine St|  2023-01-01 05:29:16|
|     1004|     TRK1004|       2023-01-01|          2023-01-02|         Delayed| 202 Birch Rd|  2023-01-01 13:12:49|
|     1005|     TRK1005|       2023-01-01|          2023-01-03| 