### **Data Transformations**

#### Reading Data into a Dataframe

In [None]:
df = spark.read.load('abfss://data@synapsedp203dl.dfs.core.windows.net/sales/*.csv', format='csv') 

In [None]:
display(df.limit(10))

#### Provide Schema for the Data

In [None]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

orderSchema = StructType([
    StructField("SalesOrderNumber", StringType()),
    StructField("SalesOrderLineNumber", IntegerType()),
    StructField("OrderDate", DateType()),
    StructField("CustomerName", StringType()),
    StructField("Email", StringType()),
    StructField("Item", StringType()),
    StructField("Quantity", IntegerType()),
    StructField("UnitPrice", FloatType()),
    StructField("Tax", FloatType())
    ])

df = spark.read.load('abfss://data@synapsedp203dl.dfs.core.windows.net/sales/*.csv', format='csv', schema=orderSchema, header = True)
  

In [None]:
display(df.limit(10))

In [None]:
 df.printSchema()

#### Transformations for Customer Data

In [None]:
customers = df['CustomerName', 'Email']
print(customers.count())
print(customers.distinct().count())
display(customers.distinct())

In [None]:
from pyspark.sql.functions import split, col

# Create the new FirstName and LastName fields
customer_data_df = customers.withColumn("FirstName", split(col("CustomerName"), " ").getItem(0)).withColumn("LastName", split(col("CustomerName"), " ").getItem(1))

# Remove the CustomerName field
customer_data_df = customer_data_df.drop("CustomerName")

display(customer_data_df.limit(10))
     


In [None]:
customer_data_df = customer_data_df.select('FirstName','LastName','Email')
display(customer_data_df.limit(10))

In [None]:
customer_data_df.write.mode("overwrite").parquet('abfss://data@synapsedp203dl.dfs.core.windows.net/sales/transformed/customer_data')
print ("Transformed data saved!")

Reading saved data

In [None]:
orders_c = spark.read.parquet('abfss://data@synapsedp203dl.dfs.core.windows.net/sales/transformed/customer_data')
display(orders_c)

#### Transformations for Product Data

In [None]:
productQuantitySold = df.select("Item", "Quantity").groupBy("Item").sum()
display(productQuantitySold)

In [None]:
productQuantitySold = productQuantitySold.select(col("Item"), col("sum(Quantity)").alias("Total_Quantity"))

In [None]:
display(productQuantitySold)

In [None]:
ItemSales = df.select("Item", "Quantity", "UnitPrice")
display(ItemSales)

In [None]:
productSales = ItemSales.withColumn("Total Sales", col("Quantity")*col("UnitPrice"))

In [None]:
display(productSales)

In [None]:
productSales = productSales.select("Item", "Total Sales").groupBy("Item").sum()
display(productSales)

In [None]:
productSales = productSales.select('Item',round('sum(Total Sales)', 2).alias('Total_Sales'))
display(productSales)

In [None]:
salesByProduct = productQuantitySold.join(productSales, productQuantitySold.Item == productSales.Item, 'inner').\
                select(productQuantitySold.Item, productQuantitySold.Total_Quantity,productSales.Total_Sales )

display(salesByProduct)

In [None]:
salesByProduct.write.mode("overwrite").parquet('abfss://data@synapsedp203dl.dfs.core.windows.net/sales/transformed/salesByProduct')
print ("Transformed data saved!")

Reading saved data

In [None]:
orders_p = spark.read.parquet('abfss://data@synapsedp203dl.dfs.core.windows.net/sales/transformed/salesByProduct')
display(orders_p)

#### Transformations for MonthlySales Data

In [None]:
sales = df.select("SalesOrderNumber","OrderDate", "Quantity", "UnitPrice")
display(sales)

In [None]:
dailySales = sales.withColumn("TotalSales", col("Quantity")*col("UnitPrice")).groupBy("OrderDate").\
                agg(count("SalesOrderNumber").alias("OrderCount"), sum("TotalSales").alias("Total sales"))
display(dailySales)

In [None]:
MonthlySales = dailySales.select(year("OrderDate").alias("Year"),\
                                month("OrderDate").alias("Month"),"OrderCount","Total sales").\
                groupBy("Year", "Month")\
                .agg(sum("OrderCount").alias("TotalOrderCount"), \
                        sum("Total sales").alias("TotalSales")).\
                orderBy("Year", "Month")
display(MonthlySales)

In [None]:
MonthlySales = MonthlySales.select('Year','Month','TotalOrderCount',round('TotalSales', 2).alias('Total_Sales'))

In [None]:
display(MonthlySales)

In [None]:
MonthlySales.write.mode("overwrite").parquet('abfss://data@synapsedp203dl.dfs.core.windows.net/sales/transformed/MonthlySales')
print ("Transformed data saved!")

Reading saved data

In [None]:
orders_m = spark.read.parquet('abfss://data@synapsedp203dl.dfs.core.windows.net/sales/transformed/MonthlySales')
display(orders_m)

#### Filtering Data

In [None]:
customers = df.select("CustomerName", "Email").where(df['Item']=='Road-250 Red, 52')
print(customers.count())
print(customers.distinct().count())
display(customers.distinct())

#### Partitioning Data

In [None]:
salesOrders = df.select("SalesOrderNumber","OrderDate", "CustomerName", "Item", "Quantity", )
display(salesOrders)

In [None]:
salesOrders = salesOrders.withColumn("Year", year("OrderDate"))\
                        .withColumn("Month", month("OrderDate"))

In [None]:
display(salesOrders)

In [None]:
salesOrders = salesOrders.drop("OrderDate")
display(salesOrders)

In [None]:
salesOrders.write.partitionBy("Year","Month").mode("overwrite").parquet("abfss://data@synapsedp203dl.dfs.core.windows.net/sales/transformed/Partitioned_data")
print ("Transformed data saved!")

#### Read Partitioned Data

In [None]:
orders_2020 = spark.read.parquet('abfss://data@synapsedp203dl.dfs.core.windows.net/sales/transformed/Partitioned_data/Year=2020/Month=*')
display(orders_2020)

In [None]:
orders_2020 = spark.read.parquet('abfss://data@synapsedp203dl.dfs.core.windows.net/sales/transformed/Partitioned_data/Year=2020/Month=12')
display(orders_2020)

#### Save data in a Table

In [None]:
df.write.saveAsTable('sales_orders', format='parquet', mode='overwrite', path='abfss://data@synapsedp203dl.dfs.core.windows.net/sales/transformed/Sales_Orders')
print ("Transformed data saved!")

#### Using SQL in a Python Notebook and Reading data from Tables

In [None]:
%%sql
CREATE DATABASE IF NOT EXISTS SparkDB

In [None]:
df.write.saveAsTable('SparkDB.sales_orders', format='parquet', mode='overwrite', path='abfss://data@synapsedp203dl.dfs.core.windows.net/sales/transformed/Sales_Orders')
print ("Transformed data saved!")     

In [None]:
%%sql
SELECT * FROM SparkDB.sales_orders
WHERE Item = 'Road-250 Red, 52'

In [None]:
sql_transform = spark.sql("SELECT *, YEAR(OrderDate) AS Year, MONTH(OrderDate) AS Month FROM SparkDB.sales_orders")
display(sql_transform.limit(5))

#### Create a Temp View

In [None]:
MonthlySales.createOrReplaceTempView("MonthlySales")

In [None]:
%%sql
SELECT * FROM MonthlySales

#### Create a Global Temp View

In [None]:
salesByProduct.createOrReplaceGlobalTempView("salesByProduct")

In [None]:
%%sql
SELECT * FROM global_temp.salesByProduct