In [0]:
#imports

from pyspark.sql.types import StructType, StructField, DateType, StringType, IntegerType, FloatType,TimestampType
from pyspark.sql.functions import col, to_timestamp\
    

# Define the schema of your CSV file
schema = StructType([
    StructField("Order ID", IntegerType(), False),
    StructField("Product", StringType(), False),  
    StructField("Quantity Ordered", IntegerType(), False),
    StructField("Price Each", FloatType(), False),
    StructField("Order Date", TimestampType(), False),
    StructField("Purchase Address", StringType(), True)
])

new_schema = StructType([
    StructField("Order_id", StringType()),
    StructField("Product", StringType()),  
    StructField("Quantity", StringType()),
    StructField("Price", StringType()),
    StructField("Order_date", StringType()),
    StructField("Address", StringType())
])


In [0]:

df = spark.read.format("csv") \
  .option("inferSchema", False) \
  .option("header", True) \
  .option("sep", ',') \
  .load(["dbfs:/FileStore/Sales_January_2019.csv","dbfs:/FileStore/Sales_February_2019.csv","dbfs:/FileStore/Sales_March_2019.csv"])

df.printSchema()

# Create a Dataframe with new schema with Renamed Column names and schema.

df2 = spark.createDataFrame(df.rdd, schema=new_schema)

df2.printSchema()


root
 |-- Order ID: string (nullable = true)
 |-- Product: string (nullable = true)
 |-- Quantity Ordered: string (nullable = true)
 |-- Price Each: string (nullable = true)
 |-- Order Date: string (nullable = true)
 |-- Purchase Address: string (nullable = true)

root
 |-- Order_id: string (nullable = true)
 |-- Product: string (nullable = true)
 |-- Quantity: string (nullable = true)
 |-- Price: string (nullable = true)
 |-- Order_date: string (nullable = true)
 |-- Address: string (nullable = true)



In [0]:
# Handling Inconsistent Data 

df3 = df2.withColumn("Order_date",to_timestamp(col("Order_date"), "MM/dd/yy HH:mm").alias("Order_date"))

# Create a temporary view for the DataFrame to use Spark SQL
df3.createOrReplaceTempView("Orders")


# Fixing the Outliers and Missing values(Blank Rows) with new temp view 'Orders2'

df4 = spark.sql(sqlQuery="SELECT *,(Quantity*Price) as sales FROM Orders WHERE Order_id IS NOT NULL AND Order_id <> 'Order ID' ORDER BY Order_date DESC").createOrReplaceTempView("Orders2")


# Convert Nonetype to dataframe
df5=spark.sql("select * from Orders2").show(truncate=False)


+--------+--------------------------+--------+------+-------------------+---------------------------------------+------+
|Order_id|Product                   |Quantity|Price |Order_date         |Address                                |sales |
+--------+--------------------------+--------+------+-------------------+---------------------------------------+------+
|174575  |AA Batteries (4-pack)     |1       |3.84  |2019-04-01 03:21:00|969 Adams St, Boston, MA 02215         |3.84  |
|165622  |Lightning Charging Cable  |1       |14.95 |2019-04-01 02:59:00|65 Madison St, Boston, MA 02215        |14.95 |
|167813  |27in 4K Gaming Monitor    |1       |389.99|2019-04-01 02:42:00|840 11th St, Seattle, WA 98101         |389.99|
|175769  |Bose SoundSport Headphones|1       |99.99 |2019-04-01 02:22:00|974 13th St, New York City, NY 10001   |99.99 |
|166309  |AA Batteries (4-pack)     |1       |3.84  |2019-04-01 01:32:00|2 Church St, Seattle, WA 98101         |3.84  |
|169177  |Macbook Pro Laptop    

In [0]:
#2.	Get the date on which max sales was done by a product in these 3 months
max_sales_date_one_product_df=spark.sql(sqlQuery="select Product,Order_date,SUM(Quantity*Price) AS sales FROM Orders2 group by Order_date,Product order by sales desc,Order_date desc limit 1").show(truncate=False)

+------------------+-------------------+------+
|Product           |Order_date         |sales |
+------------------+-------------------+------+
|Macbook Pro Laptop|2019-03-31 10:12:00|3400.0|
+------------------+-------------------+------+



In [0]:
#3.	Get the date on which max sales was done for all products in these 3 months
max_sales_date_all_product_df=spark.sql(sqlQuery="select Product,Order_date,SUM(Quantity*Price) AS sales FROM Orders2 group by Order_date,Product order by sales desc,Order_date desc").show(truncate=False)



+------------------+-------------------+-------+
|Product           |Order_date         |sales  |
+------------------+-------------------+-------+
|Macbook Pro Laptop|2019-03-31 10:12:00|3400.0 |
|Macbook Pro Laptop|2019-03-30 10:47:00|3400.0 |
|Macbook Pro Laptop|2019-03-29 21:17:00|3400.0 |
|Macbook Pro Laptop|2019-03-05 21:25:00|3400.0 |
|Macbook Pro Laptop|2019-02-26 17:12:00|3400.0 |
|Macbook Pro Laptop|2019-02-22 21:06:00|3400.0 |
|Macbook Pro Laptop|2019-01-20 00:15:00|3400.0 |
|ThinkPad Laptop   |2019-03-31 22:20:00|1999.98|
|ThinkPad Laptop   |2019-03-25 18:39:00|1999.98|
|ThinkPad Laptop   |2019-03-22 11:55:00|1999.98|
|ThinkPad Laptop   |2019-03-16 13:17:00|1999.98|
|ThinkPad Laptop   |2019-03-11 09:38:00|1999.98|
|ThinkPad Laptop   |2019-02-21 17:24:00|1999.98|
|ThinkPad Laptop   |2019-01-31 17:47:00|1999.98|
|Macbook Pro Laptop|2019-04-01 01:20:00|1700.0 |
|Macbook Pro Laptop|2019-03-31 22:25:00|1700.0 |
|Macbook Pro Laptop|2019-03-31 19:19:00|1700.0 |
|Macbook Pro Laptop|

In [0]:
#4.	Get the average sales value for each product in these 3 months
Avg_sales_per_product_df=spark.sql(sqlQuery="select Product,AVG(Quantity*Price) AS sales FROM Orders2 group by Product order by Sales desc").show(truncate=False)


+--------------------------+------------------+
|Product                   |sales             |
+--------------------------+------------------+
|Macbook Pro Laptop        |1700.0            |
|ThinkPad Laptop           |1001.1847311827881|
|iPhone                    |700.5166051660517 |
|Google Phone              |601.5530629853322 |
|LG Dryer                  |600.0             |
|LG Washing Machine        |600.0             |
|Vareebadd Phone           |400.9367681498829 |
|27in 4K Gaming Monitor    |390.95214638157285|
|34in Ultrawide Monitor    |381.20402555909936|
|Flatscreen TV             |301.3114754098361 |
|Apple Airpods Headphones  |150.76628352490422|
|27in FHD Monitor          |150.69182486630766|
|20in Monitor              |110.69326086956595|
|Bose SoundSport Headphones|100.9999999999964 |
|Lightning Charging Cable  |16.03862666034043 |
|USB-C Charging Cable      |13.095375982041809|
|Wired Headphones          |13.036129119394804|
|AA Batteries (4-pack)     |5.1748571428

In [0]:
#5.add a new column which is “salesdiff” where this column will contain the difference of the sales in the current row (current date of that row) and the next row (previous date of that row, as the date columns are sorted by desc) grouped on the product



salesdiff_df=spark.sql("SELECT *,sales - LEAD(sales, 1) OVER (PARTITION BY Product ORDER BY Order_date DESC) AS salesdiff FROM Orders2").createOrReplaceTempView("Salesdiff")


# Convert Nonetype to dataframe
Final_salesdiff_df=spark.sql("select * from Salesdiff").show(truncate=False)




+--------+------------+--------+------+-------------------+-----------------------------------------+------+---------+
|Order_id|Product     |Quantity|Price |Order_date         |Address                                  |sales |salesdiff|
+--------+------------+--------+------+-------------------+-----------------------------------------+------+---------+
|169023  |20in Monitor|1       |109.99|2019-03-31 22:02:00|404 Chestnut St, San Francisco, CA 94016 |109.99|0.0      |
|169504  |20in Monitor|1       |109.99|2019-03-31 19:38:00|338 Meadow St, Los Angeles, CA 90001     |109.99|0.0      |
|174561  |20in Monitor|1       |109.99|2019-03-31 19:38:00|337 River St, Portland, OR 97035         |109.99|0.0      |
|172744  |20in Monitor|1       |109.99|2019-03-31 16:37:00|476 Jefferson St, San Francisco, CA 94016|109.99|0.0      |
|171872  |20in Monitor|1       |109.99|2019-03-31 13:57:00|466 Pine St, San Francisco, CA 94016     |109.99|0.0      |
|168204  |20in Monitor|1       |109.99|2019-03-3

In [0]:
#6.	Get the orderId and purchase address details who made max sales in all the 3 months

max_sales_order_id_address_df=spark.sql(sqlQuery="select Order_id,Address,SUM(Quantity*Price) as sales from  Orders2 group by Order_id,Address order by sales desc limit 1").show(truncate=False)

+--------+------------------------------------+------+
|Order_id|Address                             |sales |
+--------+------------------------------------+------+
|150518  |847 10th St, San Francisco, CA 94016|2400.0|
+--------+------------------------------------+------+



In [0]:
#7.	Extract city from the purchase address column which is 2nd element in , delimited separated string and determine the city from where more orders came in all these 3 months
from pyspark.sql.functions import split, col, count, desc

df_city2=spark.sql("SELECT SUBSTRING_INDEX(SUBSTRING_INDEX(Address, ',', 2), ',', -1) AS City FROM Orders2").createOrReplaceTempView("OrdersWithCity")


address_city_df=spark.sql("select * from OrdersWithCity")

#address_city_df.show()

Max_orders_from_city_df=spark.sql("select city,count(*) as No_of_orders from OrdersWithCity group by city order by No_of_orders desc limit 1 ").show()


+--------------+------------+
|          city|No_of_orders|
+--------------+------------+
| San Francisco|        8863|
+--------------+------------+



In [0]:
#8.	Get the total order count details for each city in all the 3 months

Total_orders_from_cities_df=spark.sql("select city,count(*) as No_of_orders from OrdersWithCity where city IS NOT NULL group by city order by No_of_orders desc").show()


+--------------+------------+
|          city|No_of_orders|
+--------------+------------+
| San Francisco|        8863|
|   Los Angeles|        5886|
| New York City|        4895|
|        Boston|        3952|
|        Dallas|        2968|
|       Atlanta|        2967|
|       Seattle|        2855|
|      Portland|        2442|
|        Austin|        1993|
+--------------+------------+

