#Group : 14

Malika Lal 12220004

Snigdha Bhattacharjee 12220067

Deepak Sahu 12220074

Aadarsh Mohapatra 12220033


##Landing Zone

The landing zone is a storage area designed to handle the initial ingested data. It's a raw, transient area where data is landed without much processing. 

In [0]:
%fs ls dbfs:/FileStore/tables/

path,name,size,modificationTime
dbfs:/FileStore/tables/Airline_Case_Graphframes-1.ipynb,Airline_Case_Graphframes-1.ipynb,9836,1679199794000
dbfs:/FileStore/tables/Airline_Case_Graphframes-2.ipynb,Airline_Case_Graphframes-2.ipynb,9836,1679200069000
dbfs:/FileStore/tables/Airline_Case_Graphframes.ipynb,Airline_Case_Graphframes.ipynb,9836,1679199002000
dbfs:/FileStore/tables/GraphFrames.ipynb,GraphFrames.ipynb,5789,1679199001000
dbfs:/FileStore/tables/GraphFramesCommands.txt,GraphFramesCommands.txt,2285,1679199001000
dbfs:/FileStore/tables/Market_Basket_Analysis_v1.dbc,Market_Basket_Analysis_v1.dbc,12180,1704530413000
dbfs:/FileStore/tables/Stream_processing_with_Kafka_v1.dbc,Stream_processing_with_Kafka_v1.dbc,11492,1704530413000
dbfs:/FileStore/tables/Working_with_JSON_and_SQL_1_0.dbc,Working_with_JSON_and_SQL_1_0.dbc,19220,1704530413000
dbfs:/FileStore/tables/airportsna.csv,airportsna.csv,11412,1679199002000
dbfs:/FileStore/tables/customers.json,customers.json,15815082,1704000459000


The command %fs ls dbfs:/FileStore/tables/ is used to list the files in the specified directory (dbfs:/FileStore/tables/) in the Databricks DBFS. It provides information about the files, such as their names, sizes, and modification dates.

In [0]:
spark.conf.set("spark.sql.legacy.timeParserPolicy","LEGACY")

The spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY") code is used to set a configuration option in Apache Spark's SparkSession.

In [0]:
%sql 
DROP DATABASE IF EXISTS landing_orders CASCADE;

CREATE DATABASE landing_orders;

--- These statements are checking whether landing_orders database exists. If exists then drop and create

Creating a landing database. Before creating database checking if it exists. So if the databse exists then drop and create.

In [0]:
%fs rm -r dbfs:/user/hive/warehouse/landing_orders.db/orders


The %fs rm -r command is used to remove files and directories in the Databricks File System (DBFS). In this case, the command %fs rm -r dbfs:/user/hive/warehouse/landing_orders.db/orders is used to recursively remove the directory containing data related to the orders table in the landing_orders.db Hive database.

## Loading CSV File

In [0]:
from pyspark.sql.types import *

# Define the custom schema
fields = StructType([StructField("order_id", IntegerType(), True),
                     StructField("order_date", StringType(), True),
                     StructField("status", StringType(), True),
                     StructField("item_id", StringType(), False), 
                     StructField("qty_ordered", StringType(), False),
                     StructField("price", StringType(), True),
                     StructField("value", StringType(), True),
                     StructField("discount_amount", StringType(), True),
                     StructField("total", StringType(), True),
                     StructField("category", StringType(), True),
                     StructField("payment_method", StringType(), True),
                     StructField("cust_id", StringType(), True),
                     StructField("year", StringType(), True),
                     StructField("month", StringType(), True)])
            
                     
  
# Read from landing zone into dataframe                     
Orders_df = sqlContext.read.csv("/FileStore/tables/orders.csv", 
                                header = True, 
                                schema = StructType(fields) )

Importing all data types and structures from the pyspark.sql.types module.Then defining the custom schema using the StructType structure to specify each field in the schema with its name, data type, and whether it is nullable.In the final step, read the csv file using the custom schema defined earlier.

In [0]:
Orders_df.show(20) ## Checking the loaded data.

+---------+----------+--------------+--------+-----------+-----+------+-----------------+---------+-----------------+--------------+-------+----+--------+
| order_id|order_date|        status| item_id|qty_ordered|price| value|  discount_amount|    total|         category|payment_method|cust_id|year|   month|
+---------+----------+--------------+--------+-----------+-----+------+-----------------+---------+-----------------+--------------+-------+----+--------+
|100354678|2020-10-01|      received|574772.0|       21.0| 89.9|1798.0|              0.0|   1798.0|    Men's Fashion|           cod|60124.0|2020|Oct-2020|
|100354678|2020-10-01|      received|574774.0|       11.0| 19.0| 190.0|              0.0|    190.0|    Men's Fashion|           cod|60124.0|2020|Oct-2020|
|100354680|2020-10-01|      complete|574777.0|        9.0|149.9|1199.2|              0.0|   1199.2|    Men's Fashion|           cod|60124.0|2020|Oct-2020|
|100354680|2020-10-01|      complete|574779.0|        9.0| 79.9| 639.2

In [0]:
Orders_df.printSchema() ## Checking the schema of the loaded data in the landing zone.

root
 |-- order_id: integer (nullable = true)
 |-- order_date: string (nullable = true)
 |-- status: string (nullable = true)
 |-- item_id: string (nullable = true)
 |-- qty_ordered: string (nullable = true)
 |-- price: string (nullable = true)
 |-- value: string (nullable = true)
 |-- discount_amount: string (nullable = true)
 |-- total: string (nullable = true)
 |-- category: string (nullable = true)
 |-- payment_method: string (nullable = true)
 |-- cust_id: string (nullable = true)
 |-- year: string (nullable = true)
 |-- month: string (nullable = true)



In [0]:
from pyspark.sql.functions import month, year, to_date, col

Orders_df = Orders_df.withColumn("item_id",  col("item_id").cast("integer"))
Orders_df = Orders_df.withColumn("qty_ordered",  col("qty_ordered").cast("integer"))
Orders_df = Orders_df.withColumn("cust_id",  col("cust_id").cast("integer"))
Orders_df = Orders_df.withColumn("year",  col("year").cast("integer"))

Orders_df.printSchema()

root
 |-- order_id: integer (nullable = true)
 |-- order_date: string (nullable = true)
 |-- status: string (nullable = true)
 |-- item_id: integer (nullable = true)
 |-- qty_ordered: integer (nullable = true)
 |-- price: string (nullable = true)
 |-- value: string (nullable = true)
 |-- discount_amount: string (nullable = true)
 |-- total: string (nullable = true)
 |-- category: string (nullable = true)
 |-- payment_method: string (nullable = true)
 |-- cust_id: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)



Changing a datatype of few of the variables for better computation in the stagging zone.

In [0]:
Orders_df.show(10) ## Checking the data again after changing the

+---------+----------+--------------+-------+-----------+-----+------+---------------+------+-----------------+--------------+-------+----+--------+
| order_id|order_date|        status|item_id|qty_ordered|price| value|discount_amount| total|         category|payment_method|cust_id|year|   month|
+---------+----------+--------------+-------+-----------+-----+------+---------------+------+-----------------+--------------+-------+----+--------+
|100354678|2020-10-01|      received| 574772|         21| 89.9|1798.0|            0.0|1798.0|    Men's Fashion|           cod|  60124|2020|Oct-2020|
|100354678|2020-10-01|      received| 574774|         11| 19.0| 190.0|            0.0| 190.0|    Men's Fashion|           cod|  60124|2020|Oct-2020|
|100354680|2020-10-01|      complete| 574777|          9|149.9|1199.2|            0.0|1199.2|    Men's Fashion|           cod|  60124|2020|Oct-2020|
|100354680|2020-10-01|      complete| 574779|          9| 79.9| 639.2|            0.0| 639.2|    Men's Fas

In [0]:
%sql
DROP TABLE IF EXISTS landing_orders.orders; --- Checking if the table in the databse exists. So drop and create if exists.

In [0]:
Orders_df.write.saveAsTable("landing_orders.orders") ##This is used to save the DataFrame as a table in Spark SQL. The argument passed to saveAsTable  is the database name "landing_orders" and the table name "orders" .



In [0]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
  .appName("Databricks Shell") \
  .config("spark.sql.hive.metastorePartitionPruningFallbackOnException", "true") \
  .getOrCreate()

In [0]:
sql_query = """
SELECT * FROM landing_orders.orders LIMIT 10
"""

result_df = spark.sql(sql_query)
result_df.show() 

+---------+----------+--------------+-------+-----------+-----+------+---------------+------+-----------------+--------------+-------+----+--------+
| order_id|order_date|        status|item_id|qty_ordered|price| value|discount_amount| total|         category|payment_method|cust_id|year|   month|
+---------+----------+--------------+-------+-----------+-----+------+---------------+------+-----------------+--------------+-------+----+--------+
|100354678|2020-10-01|      received| 574772|         21| 89.9|1798.0|            0.0|1798.0|    Men's Fashion|           cod|  60124|2020|Oct-2020|
|100354678|2020-10-01|      received| 574774|         11| 19.0| 190.0|            0.0| 190.0|    Men's Fashion|           cod|  60124|2020|Oct-2020|
|100354680|2020-10-01|      complete| 574777|          9|149.9|1199.2|            0.0|1199.2|    Men's Fashion|           cod|  60124|2020|Oct-2020|
|100354680|2020-10-01|      complete| 574779|          9| 79.9| 639.2|            0.0| 639.2|    Men's Fas

Checking the table data after importing from dataframe

## Loading JSON File


In [0]:
## Read the customer json file
Customer = spark.read.json("/FileStore/tables/customers.json")

In [0]:
Customer.show(10) ## Checking the file data

+-----------+-----------+--------------+--------------------+------+-----------+---------+-----+-----+----+-------+-----------------+
|       City|     County|Customer Since|              E Mail|Gender| Place Name|   Region|State|  Zip| age|cust_id|        full_name|
+-----------+-----------+--------------+--------------------+------+-----------+---------+-----+-----+----+-------+-----------------+
|     Vinson|     Harmon|     8/22/2006|jani.titus@gmail.com|     F|     Vinson|    South|   OK|73571|43.0|60124.0|      Titus, Jani|
|     Graham|   Bradford|      2/4/1981| lee.eaker@gmail.com|     M|     Graham|    South|   FL|32042|28.0|42485.0|       Eaker, Lee|
|Grand Forks|Grand Forks|     6/27/2010|jason.simoneau@gm...|     M|Grand Forks|  Midwest|   ND|58201|65.0|53620.0|  Simoneau, Jason|
|Laupahoehoe|     Hawaii|      4/3/1992|grover.bayless@ya...|     M|Laupahoehoe|     West|   HI|96764|33.0|56836.0|  Bayless, Grover|
|     Glendo|     Platte|     6/21/2015|albertina.bensen@...| 

In [0]:
Customer.printSchema() ## Checking the imported schema

root
 |-- City: string (nullable = true)
 |-- County: string (nullable = true)
 |-- Customer Since: string (nullable = true)
 |-- E Mail: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Place Name: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Zip: long (nullable = true)
 |-- age: double (nullable = true)
 |-- cust_id: double (nullable = true)
 |-- full_name: string (nullable = true)



In [0]:
from pyspark.sql.functions import month, year, to_date, col

Customer = Customer.withColumn("Customer Since", to_date(col("Customer Since"), "MM/dd/yy"))
Customer = Customer.withColumn("Zip",  col("Zip").cast("integer"))
Customer = Customer.withColumn("cust_id",  col("cust_id").cast("integer"))
Customer = Customer.withColumnRenamed("Customer Since", "CustomerSince").withColumnRenamed("Place Name", "PlaceName").withColumnRenamed("E Mail", "EMail")


Customer.printSchema()

root
 |-- City: string (nullable = true)
 |-- County: string (nullable = true)
 |-- CustomerSince: date (nullable = true)
 |-- EMail: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- PlaceName: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Zip: integer (nullable = true)
 |-- age: double (nullable = true)
 |-- cust_id: integer (nullable = true)
 |-- full_name: string (nullable = true)



Doing basic transformation of datatype change and then validating the change by viewing schema

In [0]:
Customer.show(10) ## Checking the transformed data

+-----------+-----------+-------------+--------------------+------+-----------+---------+-----+-----+----+-------+-----------------+
|       City|     County|CustomerSince|               EMail|Gender|  PlaceName|   Region|State|  Zip| age|cust_id|        full_name|
+-----------+-----------+-------------+--------------------+------+-----------+---------+-----+-----+----+-------+-----------------+
|     Vinson|     Harmon|   2006-08-22|jani.titus@gmail.com|     F|     Vinson|    South|   OK|73571|43.0|  60124|      Titus, Jani|
|     Graham|   Bradford|   1981-02-04| lee.eaker@gmail.com|     M|     Graham|    South|   FL|32042|28.0|  42485|       Eaker, Lee|
|Grand Forks|Grand Forks|   2010-06-27|jason.simoneau@gm...|     M|Grand Forks|  Midwest|   ND|58201|65.0|  53620|  Simoneau, Jason|
|Laupahoehoe|     Hawaii|   1992-04-03|grover.bayless@ya...|     M|Laupahoehoe|     West|   HI|96764|33.0|  56836|  Bayless, Grover|
|     Glendo|     Platte|   2015-06-21|albertina.bensen@...|     F|  

In [0]:
%fs rm -r dbfs:/user/hive/warehouse/landing_orders.db/CustomerMaster 

In [0]:


dbutils.fs.rm("dbfs:/user/hive/warehouse/landing_orders.db/customermaster", True)



Out[24]: False

Above two queries removing files and directories related to above 

In [0]:
%sql
DROP TABLE IF EXISTS landing_orders.customermaster; --- Drop table if exists 

In [0]:
Customer.write.saveAsTable("landing_orders.customermaster") # Saving the data in the table format from dataframe.


In [0]:

sql_query = """
SELECT * FROM landing_orders.customermaster limit 10;
"""

result_df = spark.sql(sql_query)
result_df.show() 

## Viewing the customer master data loaded in the databse.

+-----------+-----------+-------------+--------------------+------+-----------+---------+-----+-----+----+-------+-----------------+
|       City|     County|CustomerSince|               EMail|Gender|  PlaceName|   Region|State|  Zip| age|cust_id|        full_name|
+-----------+-----------+-------------+--------------------+------+-----------+---------+-----+-----+----+-------+-----------------+
|     Vinson|     Harmon|   2006-08-22|jani.titus@gmail.com|     F|     Vinson|    South|   OK|73571|43.0|  60124|      Titus, Jani|
|     Graham|   Bradford|   1981-02-04| lee.eaker@gmail.com|     M|     Graham|    South|   FL|32042|28.0|  42485|       Eaker, Lee|
|Grand Forks|Grand Forks|   2010-06-27|jason.simoneau@gm...|     M|Grand Forks|  Midwest|   ND|58201|65.0|  53620|  Simoneau, Jason|
|Laupahoehoe|     Hawaii|   1992-04-03|grover.bayless@ya...|     M|Laupahoehoe|     West|   HI|96764|33.0|  56836|  Bayless, Grover|
|     Glendo|     Platte|   2015-06-21|albertina.bensen@...|     F|  