<a href="https://colab.research.google.com/github/Sumitampba/ISBMLUL2B20/blob/main/G_16_Landing_to_Staging_V2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [117]:
# Import necessary librar
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, DateType
from pyspark.sql.functions import col

# Initialize Spark session
spark = SparkSession.builder \
    .appName("Data Ingestion") \
    .getOrCreate()


In [118]:
# Define schema for orders.csv
orders_schema = StructType([
    StructField("index", StringType(), True),
    StructField("order_id", StringType(), True),
    StructField("order_date", StringType(), True),
    StructField("item_id", StringType(), True),
    StructField("qty_ordered", IntegerType(), True),
    StructField("price", IntegerType(), True),
    StructField("value", FloatType(), True),
    StructField("discount_amount", FloatType(), True),
    StructField("total", FloatType(), True),
    StructField("category", StringType(), True),
    StructField("payment_method", StringType(), True),
    StructField("cust_id", StringType(), True),
    StructField("final_status", StringType(), True)
])

In [119]:
# Read orders.csv
orders_df = spark.read.csv("/content/sample_data/Sales_landing_zone/orders/orders_01_21.csv", schema=orders_schema, header=True)
orders_df.show(10)

+-----+---------+----------+-------+-----------+-----+------+---------------+------+-----------------+---------------+-------+------------+
|index| order_id|order_date|item_id|qty_ordered|price| value|discount_amount| total|         category| payment_method|cust_id|final_status|
+-----+---------+----------+-------+-----------+-----+------+---------------+------+-----------------+---------------+-------+------------+
|  296|100438643|  21/01/21| 718698|          2| NULL|6979.2|            0.0|6979.2|Mobiles & Tablets|            cod|   6702|   cancelled|
|  297|100439395|  24/01/21| 719994|          2| NULL|6979.2|            0.0|6979.2|Mobiles & Tablets|            cod|   6702|    received|
|  298|100439912|  25/01/21| 721015|          2| NULL| 255.9|            0.0| 255.9|    Men's Fashion|            cod|   6702|   cancelled|
|  407|100437996|  18/01/21| 717518|          2| NULL|1894.6|            0.0|1894.6|Mobiles & Tablets|     Easypay_MA|  55731|   cancelled|
|  606|100430519|  0

In [120]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType, DateType

# Define the schema for the JSON file
customers_schema = StructType([
    StructField("index", IntegerType(), True),
    StructField("order_id", StringType(), True),
    StructField("order_date", StringType(), True),
    StructField("item_id", StringType(), True),
    StructField("qty_ordered", IntegerType(), True),
    StructField("price", FloatType(), True),
    StructField("value", FloatType(), True),
    StructField("discount_amount", FloatType(), True),
    StructField("total", FloatType(), True),
    StructField("category", StringType(), True),
    StructField("payment_method", StringType(), True),
    StructField("cust_id", StringType(), True),
    StructField("final_status", StringType(), True)
])


In [121]:
from pyspark.sql import SparkSession

# Create or get Spark session
spark = SparkSession.builder.appName("ReadJSON").getOrCreate()

def readJsonFile(path):
    # Read the JSON file into a DataFrame without schema
    df = spark.read.json(path)

    # Show the inferred schema
    df.printSchema()

    # Display the DataFrame
    df.show(truncate=False)

# Example usage
readJsonFile("/content/sample_data/Sales_landing_zone/customers/customers.json")


root
 |-- City: string (nullable = true)
 |-- County: string (nullable = true)
 |-- Customer Since: string (nullable = true)
 |-- E Mail: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Place Name: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Zip: long (nullable = true)
 |-- age: double (nullable = true)
 |-- cust_id: double (nullable = true)
 |-- full_name: string (nullable = true)

+------------+-----------+--------------+----------------------------+------+------------+---------+-----+-----+----+-------+------------------+
|City        |County     |Customer Since|E Mail                      |Gender|Place Name  |Region   |State|Zip  |age |cust_id|full_name         |
+------------+-----------+--------------+----------------------------+------+------------+---------+-----+-----+----+-------+------------------+
|Vinson      |Harmon     |8/22/2006     |jani.titus@gmail.com        |F     |Vinson      |South   

In [122]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *

# Initialize Spark session
spark = SparkSession.builder \
    .appName("Read Orders Data") \
    .getOrCreate()

# Define schema for orders.csv
orders_schema = StructType([
    StructField("index", StringType(), True),
    StructField("order_id", StringType(), True),
    StructField("order_date", StringType(), True),
    StructField("item_id", StringType(), True),
    StructField("qty_ordered", IntegerType(), True),
    StructField("price", FloatType(), True),
    StructField("value", FloatType(), True),
    StructField("discount_amount", FloatType(), True),
    StructField("total", FloatType(), True),
    StructField("category", StringType(), True),
    StructField("payment_method", StringType(), True),
    StructField("cust_id", StringType(), True),
    StructField("final_status", StringType(), True)
])
# Read from landing zone into dataframe
orders_df.show(10)

# Convert 'order_date' to DateType
#Reformatting the Existing Order_date column
from pyspark.sql.functions import to_date, col, year, month

# Assuming orders_df is already defined and loaded
orders_df = orders_df.withColumn("order_date", to_date(col("order_date"), "dd/MM/yy"))

# Add 'year' and 'month' columns
orders_df = orders_df.withColumn('year', year(col("order_date"))) \
                     .withColumn('month', month(col("order_date")))

# Show the DataFrame to verify
orders_df.show(truncate=False)



+-----+---------+----------+-------+-----------+-----+------+---------------+------+-----------------+---------------+-------+------------+
|index| order_id|order_date|item_id|qty_ordered|price| value|discount_amount| total|         category| payment_method|cust_id|final_status|
+-----+---------+----------+-------+-----------+-----+------+---------------+------+-----------------+---------------+-------+------------+
|  296|100438643|  21/01/21| 718698|          2| NULL|6979.2|            0.0|6979.2|Mobiles & Tablets|            cod|   6702|   cancelled|
|  297|100439395|  24/01/21| 719994|          2| NULL|6979.2|            0.0|6979.2|Mobiles & Tablets|            cod|   6702|    received|
|  298|100439912|  25/01/21| 721015|          2| NULL| 255.9|            0.0| 255.9|    Men's Fashion|            cod|   6702|   cancelled|
|  407|100437996|  18/01/21| 717518|          2| NULL|1894.6|            0.0|1894.6|Mobiles & Tablets|     Easypay_MA|  55731|   cancelled|
|  606|100430519|  0

In [123]:
spark.sql("CREATE SCHEMA IF NOT EXISTS staging")


DataFrame[]

In [124]:
schemas = spark.sql("SHOW SCHEMAS")
schemas.show(truncate=False)


+---------+
|namespace|
+---------+
|default  |
|staging  |
+---------+



In [125]:
# Check if the database exists
databases = spark.sql("SHOW DATABASES")
print(databases)

# Check if the table exists
tables = spark.sql("SHOW TABLES IN staging")
print(tables)

DataFrame[namespace: string]
DataFrame[namespace: string, tableName: string, isTemporary: boolean]


In [126]:
def processCSVFile(path):
    # Read the CSV file into a DataFrame using the schema
    df = spark.read.csv(path, header=True, schema=orders_schema)

    # Convert 'order_date' to a proper DateType
    df = df.withColumn("order_date", to_date(col("order_date"), "dd-MM-yyyy"))

    # Extract 'month' and 'year' from 'order_date'
    df = df.withColumn('month', month(col("order_date"))) \
           .withColumn('year', year(col("order_date")))

    # Define the output path for Parquet files
    parquet_path = "/content/sample_data/Sales_landing_zone/orders/"

    # Write the DataFrame to Parquet files, partitioned by 'year' and 'month'
    df.write.format("parquet") \
           .partitionBy("year", "month") \
           .mode("append") \
           .save(parquet_path)


processCSVFile("/content/sample_data/Sales_landing_zone/orders/orders_01_21.csv")

In [127]:
from pyspark.sql import SparkSession

# Initialize Spark session with Hive support
spark = SparkSession.builder \
    .appName("Spark with Hive") \
    .config("spark.sql.catalogImplementation", "hive") \
    .enableHiveSupport() \
    .getOrCreate()


In [128]:
# Assuming you have a DataFrame `sales_df` with the schema defined

# Write DataFrame to a table
orders_df.write.mode("overwrite").saveAsTable("staging.orders")


In [129]:
spark.sql("DESCRIBE staging.orders").show(truncate=False)

+---------------+---------+-------+
|col_name       |data_type|comment|
+---------------+---------+-------+
|index          |string   |NULL   |
|order_id       |string   |NULL   |
|order_date     |date     |NULL   |
|item_id        |string   |NULL   |
|qty_ordered    |int      |NULL   |
|price          |int      |NULL   |
|value          |float    |NULL   |
|discount_amount|float    |NULL   |
|total          |float    |NULL   |
|category       |string   |NULL   |
|payment_method |string   |NULL   |
|cust_id        |string   |NULL   |
|final_status   |string   |NULL   |
|year           |int      |NULL   |
|month          |int      |NULL   |
+---------------+---------+-------+



In [130]:
processCSVFile("/content/sample_data/Sales_landing_zone/orders/orders_04_21.csv")

In [131]:
# Read the CSV file
orders_df = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("/content/sample_data/Sales_landing_zone/orders/orders_01_21.csv")

# Save as Parquet
orders_df.write.format("parquet") \
    .mode("overwrite") \
    .save("/content/sample_data/Sales_landing_zone/orders_parquet/")


In [132]:
orders_df.printSchema()

root
 |-- index: integer (nullable = true)
 |-- order_id: integer (nullable = true)
 |-- order_date: string (nullable = true)
 |-- item_id: integer (nullable = true)
 |-- qty_ordered: integer (nullable = true)
 |-- price: double (nullable = true)
 |-- value: double (nullable = true)
 |-- discount_amount: double (nullable = true)
 |-- total: double (nullable = true)
 |-- category: string (nullable = true)
 |-- payment_method: string (nullable = true)
 |-- cust_id: integer (nullable = true)
 |-- final_status: string (nullable = true)



In [133]:
orders_df.show(10)

+-----+---------+----------+-------+-----------+------+------+---------------+------+-----------------+---------------+-------+------------+
|index| order_id|order_date|item_id|qty_ordered| price| value|discount_amount| total|         category| payment_method|cust_id|final_status|
+-----+---------+----------+-------+-----------+------+------+---------------+------+-----------------+---------------+-------+------------+
|  296|100438643|  21/01/21| 718698|          2|6979.2|6979.2|            0.0|6979.2|Mobiles & Tablets|            cod|   6702|   cancelled|
|  297|100439395|  24/01/21| 719994|          2|6979.2|6979.2|            0.0|6979.2|Mobiles & Tablets|            cod|   6702|    received|
|  298|100439912|  25/01/21| 721015|          2| 255.9| 255.9|            0.0| 255.9|    Men's Fashion|            cod|   6702|   cancelled|
|  407|100437996|  18/01/21| 717518|          2|1894.6|1894.6|            0.0|1894.6|Mobiles & Tablets|     Easypay_MA|  55731|   cancelled|
|  606|100430

In [134]:
spark.sql("DESCRIBE staging.orders").show(truncate=False)

+---------------+---------+-------+
|col_name       |data_type|comment|
+---------------+---------+-------+
|index          |string   |NULL   |
|order_id       |string   |NULL   |
|order_date     |date     |NULL   |
|item_id        |string   |NULL   |
|qty_ordered    |int      |NULL   |
|price          |int      |NULL   |
|value          |float    |NULL   |
|discount_amount|float    |NULL   |
|total          |float    |NULL   |
|category       |string   |NULL   |
|payment_method |string   |NULL   |
|cust_id        |string   |NULL   |
|final_status   |string   |NULL   |
|year           |int      |NULL   |
|month          |int      |NULL   |
+---------------+---------+-------+



In [135]:
import os
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType
from pyspark.sql.functions import to_date, col, month, year
from datetime import datetime

# Create or get Spark session
spark = SparkSession.builder.appName("MonthlyDataIngestion").getOrCreate()

def processMonthlyData(input_dir):
    # List all files in the directory
    files = [os.path.join(input_dir, f) for f in os.listdir(input_dir) if f.endswith(".csv")]

    for file in files:
        # Read the CSV file into a DataFrame using the schema
        df = spark.read.csv(file, header=True, schema=orders_schema)

        # Convert 'order_date' to a proper DateType
        df = df.withColumn("order_date", to_date(col("order_date"), "dd-MM-yyyy"))

        # Extract 'month' and 'year' from 'order_date'
        df = df.withColumn('month', month(col("order_date"))) \
               .withColumn('year', year(col("order_date")))

        # Define the output path for Parquet files
        parquet_path = "/content/sample_data/Sales_landing_zone/orders/"

        # Write the DataFrame to Parquet files, partitioned by 'year' and 'month'
        df.write.format("parquet") \
               .partitionBy("year","month") \
               .mode("append") \
               .save(parquet_path)

# Example
input_directory = "/content/sample_data/Sales_landing_zone/orders/"
processMonthlyData(input_directory)


In [136]:
# Read CSV data with basePath option
stores_df = (spark.read
             .option("header", True)
             .option("inferSchema", True)
             .csv("/content/sample_data/Sales_landing_zone/orders/orders_01_21.csv"))


In [137]:
# Define the schema with partition columns
stores_schema = StructType([
    StructField("index", IntegerType(), True),
    StructField("order_id", IntegerType(), True),
    StructField("order_date", StringType(), True),
    StructField("item_id", IntegerType(), True),
    StructField("qty_ordered", IntegerType(), True),
    StructField("price", DoubleType(), True),
    StructField("value", DoubleType(), True),
    StructField("discount_amount", DoubleType(), True),
    StructField("total", DoubleType(), True),
    StructField("category", StringType(), True),
    StructField("payment_method", StringType(), True),
    StructField("cust_id", IntegerType(), True),
    StructField("final_status", StringType(), True),
    StructField("year", IntegerType(), True),
    StructField("month", IntegerType(), True)
])

# Read and write data with partitioning
stores_df = (spark.read
             .option("header", True)
             .option("inferSchema", True)
             .csv("/content/sample_data/Sales_landing_zone/orders/*.csv"))


# Parse the order_date column and extract year and month
stores_df = stores_df.withColumn("order_date", to_date(col("order_date"), "dd/MM/yy")) \
                     .withColumn("year", year(col("order_date"))) \
                     .withColumn("month", month(col("order_date")))

# Write to a new table with partition columns
stores_df.write.format("parquet") \
               .mode("overwrite") \
               .partitionBy("year", "month") \
               .saveAsTable("staging.stores")

stores_df.show(10)

+-----+---------+----------+-------+-----------+-----+-----+---------------+---------+-------------+--------------+-------+------------+----+-----+
|index| order_id|order_date|item_id|qty_ordered|price|value|discount_amount|    total|     category|payment_method|cust_id|final_status|year|month|
+-----+---------+----------+-------+-----------+-----+-----+---------------+---------+-------------+--------------+-------+------------+----+-----+
|   12|100403034|2020-12-24| 656937|          2|254.8|254.8|       39.80628|214.99372|   Appliances|       Easypay|  42485|   cancelled|2020|   12|
|   13|100403034|2020-12-24| 656938|          2|315.5|315.5|       49.28917|266.21083|   Appliances|       Easypay|  42485|   cancelled|2020|   12|
|   14|100403034|2020-12-24| 656939|          2| 69.8| 69.8|       10.90455| 58.89545|Home & Living|       Easypay|  42485|   cancelled|2020|   12|
|   15|100403077|2020-12-24| 657023|          2|254.8|254.8|       39.80628|214.99372|   Appliances|       Easyp

In [138]:
# Execute SQL query
result_df = spark.sql("SELECT * FROM staging.stores LIMIT 10")

# Display the result
result_df.show()

# List all tables in the staging database
spark.sql("SHOW TABLES IN staging").show()



+-----+---------+----------+-------+-----------+-----+-----+---------------+---------+-------------+--------------+-------+------------+----+-----+
|index| order_id|order_date|item_id|qty_ordered|price|value|discount_amount|    total|     category|payment_method|cust_id|final_status|year|month|
+-----+---------+----------+-------+-----------+-----+-----+---------------+---------+-------------+--------------+-------+------------+----+-----+
|   12|100403034|2020-12-24| 656937|          2|254.8|254.8|       39.80628|214.99372|   Appliances|       Easypay|  42485|   cancelled|2020|   12|
|   13|100403034|2020-12-24| 656938|          2|315.5|315.5|       49.28917|266.21083|   Appliances|       Easypay|  42485|   cancelled|2020|   12|
|   14|100403034|2020-12-24| 656939|          2| 69.8| 69.8|       10.90455| 58.89545|Home & Living|       Easypay|  42485|   cancelled|2020|   12|
|   15|100403077|2020-12-24| 657023|          2|254.8|254.8|       39.80628|214.99372|   Appliances|       Easyp

In [139]:
# Check existing tables and their schemas
spark.sql("SHOW TABLES IN staging").show()
spark.sql("DESCRIBE TABLE staging.orders").show(truncate=False)


+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
|  staging|   orders|      false|
|  staging|   stores|      false|
+---------+---------+-----------+

+---------------+---------+-------+
|col_name       |data_type|comment|
+---------------+---------+-------+
|index          |string   |NULL   |
|order_id       |string   |NULL   |
|order_date     |date     |NULL   |
|item_id        |string   |NULL   |
|qty_ordered    |int      |NULL   |
|price          |int      |NULL   |
|value          |float    |NULL   |
|discount_amount|float    |NULL   |
|total          |float    |NULL   |
|category       |string   |NULL   |
|payment_method |string   |NULL   |
|cust_id        |string   |NULL   |
|final_status   |string   |NULL   |
|year           |int      |NULL   |
|month          |int      |NULL   |
+---------------+---------+-------+

