In [None]:
parameters = [
    {"source":"global_fashion_sales", "table_name":"customers", "suffix":"gfs", "partion_column": "Date_Of_Birth" },
    {"source":"global_fashion_sales", "table_name":"discounts", "suffix":"gfs"},
    {"source":"global_fashion_sales", "table_name":"employees", "suffix":"gfs"},
    {"source":"global_fashion_sales", "table_name":"products", "suffix":"gfs"},
    {"source":"global_fashion_sales", "table_name":"stores", "suffix":"gfs"},
    {"source":"global_fashion_sales", "table_name":"transactions", "suffix":"gfs", "partion_column": "Date"}
    ]

In [1]:
parameters = [
    {"source":"global_fashion_sales", "table_name":"transactions", "suffix":"gfs", "partion_column": "Date"}
    ]

In [2]:
import findspark
findspark.init()
findspark.find()

'c:\\Users\\user\\miniconda3\\lib\\site-packages\\pyspark'

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import year, col, lit
from datetime import datetime
import logging

In [None]:
log_file = "silver_layer.log"

# Remove any existing handlers
for handler in logging.root.handlers[:]:
    logging.root.removeHandler(handler)

# Configure logging
logging.basicConfig(
    filename=log_file,
    level=logging.INFO,
    filemode="w",  # Overwrites log file on each run
    format="%(asctime)s - %(levelname)s - %(message)s",
    datefmt="%Y-%m-%d %H:%M:%S"
)

logging.info("Logging system initialized.")

In [None]:
logging.info("Starting Spark Session")

spark = SparkSession.builder\
        .appName("Silver_Layer_Ingesion")\
        .config("spark.executor.memory", "4g") \
        .config("spark.driver.memory", "4g")\
        .config("spark.sql.shuffle.partitions", "200")\
        .enableHiveSupport()\
        .getOrCreate()

logging.info("Spark Session started")

In [None]:
def cleaning_data(source, file_name, partition_column = None):
        try:
                logs ={}
                basepath = "hdfs://0.0.0.0:19000/bronze_layer"
                dataset_path = basepath +"/"+ source + "/" + file_name + "/"
                print(dataset_path)
                data = spark.read\
                        .option("header","true")\
                        .option("inferSchema","true")\
                        .csv(dataset_path)

                #remving duplicates
                data = data.distinct()
                
                #renaming column
                data = data.select([col(c).alias(c.replace(" ", "_")) for c in data.columns])

                # adding partitioning column
                if partition_column != None:
                        data = data.withColumn("year", year(col(partition_column)))
                else:
                        current_year = datetime.now().year
                        data = data.withColumn("year", lit(current_year))
                logs["dataset_path"] = dataset_path
                logs["file_name"] = file_name
                logs["no_of_rows"] = data.count()
                print(logs)
                logging.info(f"logs: {logs}")
                return data
        except Exception as e:
                logging.error(f"Error cleaning data: {str(e)}", exc_info=True)



In [None]:
def silver_layer_ingestion(data, table_name, suffix):
    try:
        database = "default"  # Change if needed
        table = suffix + "_" + table_name

        spark.sql(f"CREATE DATABASE IF NOT EXISTS {database}")
        spark.sql(f"USE {database}")

        logging.info(f"started loading {table_name} data to hive")
        # Save the DataFrame to Hive as a table
        data.write.mode("overwrite").partitionBy("year").format("parquet").saveAsTable(f"{database}.{table}")
        logging.info(f"{table_name} has successfully loaded")
    except Exception as e:
                logging.error(f"Error loading data to hive: {str(e)}", exc_info=True)


In [10]:
for i in range(len(parameters)):
    source = parameters[i].get('source')
    table_name = parameters[i].get("table_name")
    suffix = parameters[i].get("suffix")
    partition_column = parameters[i].get("partion_column", None)
    data = cleaning_data(source,table_name,partition_column)
    silver_layer_ingestion(data, table_name, suffix)

hdfs://0.0.0.0:19000/bronze_layer/global_fashion_sales/transactions/
6416029
DataFrame saved successfully in Hive!


In [None]:
logging.info("Stopping spark session")
spark.stop()
logging("Spark Session stop, Job has been completed")

In [4]:
spark.sql("select * from gfs_transactions").show()

+-------------------+----+-----------+----------+----+---------+----------+--------+-------------------+--------+----------+--------+-----------+--------+---------------+--------------------+----------------+--------------+-------------+----+
|         Invoice_ID|Line|Customer_ID|Product_ID|Size|    Color|Unit_Price|Quantity|               Date|Discount|Line_Total|Store_ID|Employee_ID|Currency|Currency_Symbol|                 SKU|Transaction_Type|Payment_Method|Invoice_Total|year|
+-------------------+----+-----------+----------+----+---------+----------+--------+-------------------+--------+----------+--------+-----------+--------+---------------+--------------------+----------------+--------------+-------------+----+
|INV-US-001-03764542|   6|      73817|      9258|   S|    WHITE|      35.5|       1|2024-07-28 20:51:00|     0.0|      35.5|       1|         10|     USD|              $|    FESW9258-S-WHITE|            Sale|          Cash|        361.0|2024|
|INV-US-001-03745464|   1|  

In [13]:
database = "default"  # Change if needed
table = "customers"

In [14]:
spark.sql(f"CREATE DATABASE IF NOT EXISTS {database}")
spark.sql(f"USE {database}")

# Save the DataFrame to Hive as a table
df.write.mode("overwrite").partitionBy("year").format("parquet").saveAsTable(f"{database}.{table}")

print("DataFrame saved successfully in Hive!")

DataFrame saved successfully in Hive!


In [14]:
spark.sql("SHOW TABLES").show()

+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
|  default|customers|      false|
+---------+---------+-----------+



In [15]:
spark.sql("select * from customers").show()

+-----------+------+-----------------------+-----------+----+-------+------+-------------+---------+----+
|Customer_ID|  Name|                  Email|  Telephone|City|Country|Gender|Date_Of_Birth|Job_Title|year|
+-----------+------+-----------------------+-----------+----+-------+------+-------------+---------+----+
|     600985|安雪梅|  安雪梅@fake_gmail.com|18884336961|惠州|   中国|     M|   2003-03-29|     NULL|2003|
|     435522|  李娟|    李娟@fake_yahoo.com|13846999218|北京|   中国|     M|   2003-01-19|     NULL|2003|
|     600992|  娄龙|    娄龙@fake_yahoo.com|15967115592|惠州|   中国|     M|   2003-04-03|     NULL|2003|
|     435533|杨春梅|杨春梅@fake_hotmail.com|13952890708|北京|   中国|     F|   2003-12-03|     NULL|2003|
|     601017|  李颖|  李颖@fake_hotmail.com|15548597618|惠州|   中国|     F|   2003-08-07|     NULL|2003|
|     435548|徐小红|  徐小红@fake_gmail.com|14757181563|北京|   中国|     M|   2003-06-10|     NULL|2003|
|     601018|  孙玉|    孙玉@fake_gmail.com|14789406324|惠州|   中国|     F|   2003-08-10|     NULL|2003|
| 

In [10]:
spark.stop()

ConnectionRefusedError: [WinError 10061] No connection could be made because the target machine actively refused it