In [0]:

/FileStore/tables/Sales_SalesOrderDetail-1.csv

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, current_date, add_months, year, row_number
from pyspark.sql.window import Window
from datetime import datetime

# Create a Spark session
spark = SparkSession.builder \
    .appName("SalesDataProcessing") \
    .getOrCreate()

# Define schema with only important columns
from pyspark.sql.types import StructType, StructField, IntegerType, DecimalType, DateType

schema = StructType([
    StructField("SalesOrderID", IntegerType(), True),
    StructField("OrderQty", IntegerType(), True),
    StructField("ProductID", IntegerType(), True),
    StructField("LineTotal", DecimalType(20, 2), True),
    StructField("ModifiedDate", DateType(), True)
])

# Load data into DataFrame
df = spark.read.csv("/FileStore/tables/Sales_SalesOrderDetail.csv", schema=schema, header=False)

# Create a DataFrame with the year extracted from the ModifiedDate
df_with_year = df.withColumn("Year", year(col("ModifiedDate")))

# Create DataFrames for each of the last 15 years with global ranking
for year_offset in range(15):
    # Calculate the end date as the current date minus (year_offset * 12) months
    end_date_expr = add_months(current_date(), -year_offset * 12)
    
    # Calculate the start date as end_date minus 12 months
    start_date_expr = add_months(end_date_expr, -12)
    
    # Filter the data for the specific year
    filtered_df = df_with_year.filter(
        (col("ModifiedDate") >= start_date_expr) & (col("ModifiedDate") < end_date_expr)
    )
    
    # Define the window specification for global ranking
    window_spec = Window.orderBy(col("LineTotal").desc())
    
    # Add ranking to the DataFrame
    ranked_df = filtered_df.withColumn("Rank", row_number().over(window_spec))
    
    # Show the filtered and ranked DataFrame
    print(f"Sales Data for Year {datetime.now().year - year_offset} with Global Ranking:")
    ranked_df.select("SalesOrderID", "OrderQty", "ProductID", "LineTotal", "ModifiedDate", "Year", "Rank").show()


Sales Data for Year 2024 with Global Ranking:
+------------+--------+---------+---------+------------+----+----+
|SalesOrderID|OrderQty|ProductID|LineTotal|ModifiedDate|Year|Rank|
+------------+--------+---------+---------+------------+----+----+
+------------+--------+---------+---------+------------+----+----+

Sales Data for Year 2023 with Global Ranking:
+------------+--------+---------+---------+------------+----+----+
|SalesOrderID|OrderQty|ProductID|LineTotal|ModifiedDate|Year|Rank|
+------------+--------+---------+---------+------------+----+----+
+------------+--------+---------+---------+------------+----+----+

Sales Data for Year 2022 with Global Ranking:
+------------+--------+---------+---------+------------+----+----+
|SalesOrderID|OrderQty|ProductID|LineTotal|ModifiedDate|Year|Rank|
+------------+--------+---------+---------+------------+----+----+
+------------+--------+---------+---------+------------+----+----+

Sales Data for Year 2021 with Global Ranking:
+--------

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, month
from pyspark.sql.types import StructType, StructField, IntegerType, DecimalType, DateType

# Create a Spark session
spark = SparkSession.builder \
    .appName("SalesDataProcessing") \
    .getOrCreate()

# Define schema with only important columns
schema = StructType([
    StructField("SalesOrderID", IntegerType(), True),
    StructField("OrderQty", IntegerType(), True),
    StructField("ProductID", IntegerType(), True),
    StructField("LineTotal", DecimalType(20, 2), True),
    StructField("ModifiedDate", DateType(), True)
])

# Load data into DataFrame
df = spark.read.csv("/FileStore/tables/Sales_SalesOrderDetail.csv", schema=schema, header=False)

# Display the initial DataFrame
print("Initial DataFrame:")
df.show()

# Create a DataFrame with the month extracted from the ModifiedDate
df_with_month = df.withColumn("Month", month(col("ModifiedDate")))

# Display the DataFrame with the month column
print("DataFrame with Month column:")
df_with_month.show()

# List to store DataFrames for each month
monthly_dfs = []

# Loop through each month to create separate DataFrames
for month_value in range(1, 13):  # 1 through 12 for January to December
    # Filter the data for the specific month
    month_df = df_with_month.filter(col("Month") == month_value)
    
    # Add the filtered DataFrame to the list
    monthly_dfs.append(month_df)
    
    # Show the filtered DataFrame for the specific month
    print(f"Sales Data for Month {month_value}:")
    month_df.show()

# Now you have a list of DataFrames `monthly_dfs` where each entry corresponds to a different month


Initial DataFrame:
+------------+--------+---------+---------+------------+
|SalesOrderID|OrderQty|ProductID|LineTotal|ModifiedDate|
+------------+--------+---------+---------+------------+
|       43659|       1|     null|     1.00|        null|
|       43659|       2|     null|     3.00|        null|
|       43659|       3|     null|     1.00|        null|
|       43659|       4|     null|     1.00|        null|
|       43659|       5|     null|     1.00|        null|
|       43659|       6|     null|     2.00|        null|
|       43659|       7|     null|     1.00|        null|
|       43659|       8|     null|     3.00|        null|
|       43659|       9|     null|     1.00|        null|
|       43659|      10|     null|     6.00|        null|
|       43659|      11|     null|     2.00|        null|
|       43659|      12|     null|     4.00|        null|
|       43660|      13|     null|     1.00|        null|
|       43660|      14|     null|     1.00|        null|
|       4366

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, month
from pyspark.sql.types import StructType, StructField, IntegerType, DecimalType, DateType
from datetime import date
import decimal

# Create a Spark session
spark = SparkSession.builder \
    .appName("SalesDataProcessing") \
    .getOrCreate()

# Define schema
schema = StructType([
    StructField("SalesOrderID", IntegerType(), True),
    StructField("OrderQty", IntegerType(), True),
    StructField("ProductID", IntegerType(), True),
    StructField("LineTotal", DecimalType(20, 2), True),
    StructField("ModifiedDate", DateType(), True)
])

# Sample data with Decimal conversion
data = [
    (1, 10, 101, decimal.Decimal(500.00), date(2024, 1, 15)),
    (2, 5, 102, decimal.Decimal(250.00), date(2024, 2, 10)),
    (3, 8, 103, decimal.Decimal(400.00), date(2024, 3, 5)),
    (4, 12, 104, decimal.Decimal(600.00), date(2024, 4, 22)),
    (5, 7, 105, decimal.Decimal(350.00), date(2024, 5, 30)),
    (6, 9, 106, decimal.Decimal(450.00), date(2024, 6, 18)),
    (7, 11, 107, decimal.Decimal(550.00), date(2024, 7, 9)),
    (8, 6, 108, decimal.Decimal(300.00), date(2024, 8, 25)),
    (9, 14, 109, decimal.Decimal(700.00), date(2024, 9, 13)),
    (10, 4, 110, decimal.Decimal(200.00), date(2024, 10, 2)),
    (11, 13, 111, decimal.Decimal(650.00), date(2024, 11, 20)),
    (12, 15, 112, decimal.Decimal(750.00), date(2024, 12, 15)),
]

# Create DataFrame with sample data
df = spark.createDataFrame(data, schema)

# Display the initial DataFrame
print("Initial DataFrame:")
df.show()

# Create a DataFrame with the month extracted from the ModifiedDate
df_with_month = df.withColumn("Month", month(col("ModifiedDate")))

# Display the DataFrame with the month column
print("DataFrame with Month column:")
df_with_month.show()

# List to store DataFrames for each month
monthly_dfs = []

# Loop through each month to create separate DataFrames
for month_value in range(1, 13):  # 1 through 12 for January to December
    # Filter the data for the specific month
    month_df = df_with_month.filter(col("Month") == month_value)
    
    # Add the filtered DataFrame to the list
    monthly_dfs.append(month_df)
    
    # Show the filtered DataFrame for the specific month
    print(f"Sales Data for Month {month_value}:")
    month_df.show()

# Now you have a list of DataFrames `monthly_dfs` where each entry corresponds to a different month


Initial DataFrame:
+------------+--------+---------+---------+------------+
|SalesOrderID|OrderQty|ProductID|LineTotal|ModifiedDate|
+------------+--------+---------+---------+------------+
|           1|      10|      101|   500.00|  2024-01-15|
|           2|       5|      102|   250.00|  2024-02-10|
|           3|       8|      103|   400.00|  2024-03-05|
|           4|      12|      104|   600.00|  2024-04-22|
|           5|       7|      105|   350.00|  2024-05-30|
|           6|       9|      106|   450.00|  2024-06-18|
|           7|      11|      107|   550.00|  2024-07-09|
|           8|       6|      108|   300.00|  2024-08-25|
|           9|      14|      109|   700.00|  2024-09-13|
|          10|       4|      110|   200.00|  2024-10-02|
|          11|      13|      111|   650.00|  2024-11-20|
|          12|      15|      112|   750.00|  2024-12-15|
+------------+--------+---------+---------+------------+

DataFrame with Month column:
+------------+--------+---------+------