In [0]:
# Cleaning up the tables if present
dbutils.fs.rm('dbfs:/user/hive/warehouse/', True)
tbl = spark.sql("SHOW TABLES").collect()
for i in tbl:
    spark.sql(f"DROP TABLE {i.tableName}")

In [0]:
%sql
-- Creating and inserting the table and data
-- Create Customers Table
CREATE TABLE IF NOT EXISTS Customers (
    CustomerID INT,
    FirstName VARCHAR(50),
    LastName VARCHAR(50),
    Email VARCHAR(100)
);

-- Create Products Table
CREATE TABLE IF NOT EXISTS Products (
    ProductID INT,
    ProductName VARCHAR(100),
    Price DECIMAL(10, 2)
);

-- Create Orders Table
CREATE TABLE IF NOT EXISTS Orders (
    OrderID INT,
    CustomerID INT,
    ProductID INT,
    Quantity INT,
    OrderDate DATE
);

INSERT INTO Customers (CustomerID, FirstName, LastName, Email)
VALUES
(1, 'John', 'Doe', 'john.doe@example.com'),
(2, 'Jane', 'Smith', 'jane.smith@example.com');

-- Insert sample data into Products
INSERT INTO Products (ProductID, ProductName, Price)
VALUES
(1, 'Laptop', 999.99),
(2, 'Smartphone', 499.99);

-- Insert sample data into Orders
INSERT INTO Orders (OrderID, CustomerID, ProductID, Quantity, OrderDate)
VALUES
(1, 1, 1, 1, '2024-08-01'),
(2, 1, 2, 2, '2024-08-02'),
(3, 2, 1, 1, '2024-08-03');

num_affected_rows,num_inserted_rows
3,3


In [0]:
tbl = spark.sql("SHOW TABLES").collect()
print("Active tables:-")
for i in tbl:
    print(i.tableName+":")
    spark.sql(f"DESCRIBE {i.tableName}").show()

Active tables:-
customers:
+----------+------------+-------+
|  col_name|   data_type|comment|
+----------+------------+-------+
|CustomerID|         int|   null|
| FirstName| varchar(50)|   null|
|  LastName| varchar(50)|   null|
|     Email|varchar(100)|   null|
+----------+------------+-------+

orders:
+----------+---------+-------+
|  col_name|data_type|comment|
+----------+---------+-------+
|   OrderID|      int|   null|
|CustomerID|      int|   null|
| ProductID|      int|   null|
|  Quantity|      int|   null|
| OrderDate|     date|   null|
+----------+---------+-------+

products:
+-----------+-------------+-------+
|   col_name|    data_type|comment|
+-----------+-------------+-------+
|  ProductID|          int|   null|
|ProductName| varchar(100)|   null|
|      Price|decimal(10,2)|   null|
+-----------+-------------+-------+



In [0]:
# Creating spark dataframes
df_and_commands = {}
for i in tbl:
    df_name = f"{i.tableName[:4]}_df"
    df_and_commands[df_name] = spark.read.option('inferSchema', 'true').table(f'{i.tableName}')
locals().update(df_and_commands)
print("Spark Dataframes created:-")
for df_name, df in df_and_commands.items():
    print(df_name+":")
    print(df.printSchema())

Spark Dataframes created:-
cust_df:
root
 |-- CustomerID: integer (nullable = true)
 |-- FirstName: string (nullable = true)
 |-- LastName: string (nullable = true)
 |-- Email: string (nullable = true)

None
orde_df:
root
 |-- OrderID: integer (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- ProductID: integer (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- OrderDate: date (nullable = true)

None
prod_df:
root
 |-- ProductID: integer (nullable = true)
 |-- ProductName: string (nullable = true)
 |-- Price: decimal(10,2) (nullable = true)

None


In [0]:
# Creating Pandas Dataframe
pddf_and_commands = {}
for df_name, df in df_and_commands.items():
    pddf_and_commands[df_name+"_pd"] = df.toPandas().infer_objects()
locals().update(pddf_and_commands)
print("Pandas Dataframe created:-")
for df_name, df in pddf_and_commands.items():
    print(df_name+":")
    print(df.dtypes)

