In [0]:
#Importing Pyspark Libraries
from pyspark.sql import SparkSession
from pyspark.sql.types  import StructField,StructType,IntegerType,LongType,StringType
from pyspark.sql.functions import col, pandas_udf, PandasUDFType,to_str,to_date,unix_timestamp, from_unixtime
#https://spark.apache.org/docs/latest/sql-ref-datatypes.html --> different type of datatypes

#Importing Python Libraries
import pandas as pd
import numpy as np
import re

#Importing Koalas Dataframe Library
import databricks.koalas as ks

#Importing UDF libraries
from pyspark.sql.functions import udf
from pyspark.sql.functions import pandas_udf, PandasUDFType



In [0]:
SS = SparkSession.builder.appName("Spark_UDF").getOrCreate()

In [0]:
Sales_location = "/FileStore/tables/Sales_Prediction.csv"
Cust_segmentation_location = "/FileStore/tables/Customer_Segmentation.xlsx"
attendance_location = "/FileStore/tables/Attendance_Data.xlsx"
df = SS.read.csv("dbfs:/FileStore/tables/Sales_Prediction.csv",header=True,inferSchema=True)
df_pandas = df.toPandas()

In [0]:
display(df)

Item_Identifier,Item_Weight,Item_Fat_Content,Item_Visibility,Item_Type,Item_MRP,Outlet_Identifier,Outlet_Establishment_Year,Outlet_Size,Outlet_Location_Type,Outlet_Type,Item_Outlet_Sales
FDA15,9.3,Low Fat,0.016047301,Dairy,249.8092,OUT049,1999,Medium,Tier 1,Supermarket Type1,3735.138
DRC01,5.92,Regular,0.019278216,Soft Drinks,48.2692,OUT018,2009,Medium,Tier 3,Supermarket Type2,443.4228
FDN15,17.5,Low Fat,0.016760075,Meat,141.618,OUT049,1999,Medium,Tier 1,Supermarket Type1,2097.27
FDX07,19.2,Regular,0.0,Fruits and Vegetables,182.095,OUT010,1998,,Tier 3,Grocery Store,732.38
NCD19,8.93,Low Fat,0.0,Household,53.8614,OUT013,1987,High,Tier 3,Supermarket Type1,994.7052
FDP36,10.395,Regular,0.0,Baking Goods,51.4008,OUT018,2009,Medium,Tier 3,Supermarket Type2,556.6088
FDO10,13.65,Regular,0.012741089,Snack Foods,57.6588,OUT013,1987,High,Tier 3,Supermarket Type1,343.5528
FDP10,,Low Fat,0.127469857,Snack Foods,107.7622,OUT027,1985,Medium,Tier 3,Supermarket Type3,4022.7636
FDH17,16.2,Regular,0.016687114,Frozen Foods,96.9726,OUT045,2002,,Tier 2,Supermarket Type1,1076.5986
FDU28,19.2,Regular,0.09444959,Frozen Foods,187.8214,OUT017,2007,,Tier 2,Supermarket Type1,4710.535


In [0]:
@udf('integer')
def Fat_content_Change(v):
  if v == 'low fat' or v == "Low Fat" or v == "LF":
    return 0
  else:
    return 1

df3 = df.withColumn('Fat_content_update', Fat_content_Change(df.Item_Fat_Content))

In [0]:
df3.display()

Item_Identifier,Item_Weight,Item_Fat_Content,Item_Visibility,Item_Type,Item_MRP,Outlet_Identifier,Outlet_Establishment_Year,Outlet_Size,Outlet_Location_Type,Outlet_Type,Item_Outlet_Sales,Fat_content_update
FDA15,9.3,Low Fat,0.016047301,Dairy,249.8092,OUT049,1999,Medium,Tier 1,Supermarket Type1,3735.138,0
DRC01,5.92,Regular,0.019278216,Soft Drinks,48.2692,OUT018,2009,Medium,Tier 3,Supermarket Type2,443.4228,1
FDN15,17.5,Low Fat,0.016760075,Meat,141.618,OUT049,1999,Medium,Tier 1,Supermarket Type1,2097.27,0
FDX07,19.2,Regular,0.0,Fruits and Vegetables,182.095,OUT010,1998,,Tier 3,Grocery Store,732.38,1
NCD19,8.93,Low Fat,0.0,Household,53.8614,OUT013,1987,High,Tier 3,Supermarket Type1,994.7052,0
FDP36,10.395,Regular,0.0,Baking Goods,51.4008,OUT018,2009,Medium,Tier 3,Supermarket Type2,556.6088,1
FDO10,13.65,Regular,0.012741089,Snack Foods,57.6588,OUT013,1987,High,Tier 3,Supermarket Type1,343.5528,1
FDP10,,Low Fat,0.127469857,Snack Foods,107.7622,OUT027,1985,Medium,Tier 3,Supermarket Type3,4022.7636,0
FDH17,16.2,Regular,0.016687114,Frozen Foods,96.9726,OUT045,2002,,Tier 2,Supermarket Type1,1076.5986,1
FDU28,19.2,Regular,0.09444959,Frozen Foods,187.8214,OUT017,2007,,Tier 2,Supermarket Type1,4710.535,1


In [0]:
def Fat_content_Change(v):
  if v == 'low fat' or v == "Low Fat" or v == "LF":
    return 0
  else:
    return 1
df_pandas['Fat_content_update'] = df_pandas['Item_Fat_Content'].apply(Fat_content_Change)

In [0]:
@pandas_udf('Integer', PandasUDFType.SCALAR)
def ordered_quantity(u,v):
  return v/u
  
df3 = df3.withColumn('Ordered_quantity', ordered_quantity(df3.Item_MRP,df3.Item_Outlet_Sales))



In [0]:
df3.display()

Item_Identifier,Item_Weight,Item_Fat_Content,Item_Visibility,Item_Type,Item_MRP,Outlet_Identifier,Outlet_Establishment_Year,Outlet_Size,Outlet_Location_Type,Outlet_Type,Item_Outlet_Sales,Fat_content_update,Ordered_quantity
FDA15,9.3,Low Fat,0.016047301,Dairy,249.8092,OUT049,1999,Medium,Tier 1,Supermarket Type1,3735.138,0,14
DRC01,5.92,Regular,0.019278216,Soft Drinks,48.2692,OUT018,2009,Medium,Tier 3,Supermarket Type2,443.4228,1,9
FDN15,17.5,Low Fat,0.016760075,Meat,141.618,OUT049,1999,Medium,Tier 1,Supermarket Type1,2097.27,0,14
FDX07,19.2,Regular,0.0,Fruits and Vegetables,182.095,OUT010,1998,,Tier 3,Grocery Store,732.38,1,4
NCD19,8.93,Low Fat,0.0,Household,53.8614,OUT013,1987,High,Tier 3,Supermarket Type1,994.7052,0,18
FDP36,10.395,Regular,0.0,Baking Goods,51.4008,OUT018,2009,Medium,Tier 3,Supermarket Type2,556.6088,1,10
FDO10,13.65,Regular,0.012741089,Snack Foods,57.6588,OUT013,1987,High,Tier 3,Supermarket Type1,343.5528,1,5
FDP10,,Low Fat,0.127469857,Snack Foods,107.7622,OUT027,1985,Medium,Tier 3,Supermarket Type3,4022.7636,0,37
FDH17,16.2,Regular,0.016687114,Frozen Foods,96.9726,OUT045,2002,,Tier 2,Supermarket Type1,1076.5986,1,11
FDU28,19.2,Regular,0.09444959,Frozen Foods,187.8214,OUT017,2007,,Tier 2,Supermarket Type1,4710.535,1,25


In [0]:
@pandas_udf('String', PandasUDFType.SCALAR)
def outlet_status(v):
  new_list = []
  for u in v:
    if u == "Medium" or u == "High" or u == "Small":
      new_list.append("Value present")
    else:
      new_list.append("no Value")
  return pd.Series(new_list)
  
df3 = df3.withColumn('Outlet_Size_status', outlet_status(df3.Outlet_Size))

In [0]:
df3.display()

Item_Identifier,Item_Weight,Item_Fat_Content,Item_Visibility,Item_Type,Item_MRP,Outlet_Identifier,Outlet_Establishment_Year,Outlet_Size,Outlet_Location_Type,Outlet_Type,Item_Outlet_Sales,Fat_content_update,Ordered_quantity,Outlet_Size_status
FDA15,9.3,Low Fat,0.016047301,Dairy,249.8092,OUT049,1999,Medium,Tier 1,Supermarket Type1,3735.138,0,14,Value present
DRC01,5.92,Regular,0.019278216,Soft Drinks,48.2692,OUT018,2009,Medium,Tier 3,Supermarket Type2,443.4228,1,9,Value present
FDN15,17.5,Low Fat,0.016760075,Meat,141.618,OUT049,1999,Medium,Tier 1,Supermarket Type1,2097.27,0,14,Value present
FDX07,19.2,Regular,0.0,Fruits and Vegetables,182.095,OUT010,1998,,Tier 3,Grocery Store,732.38,1,4,no Value
NCD19,8.93,Low Fat,0.0,Household,53.8614,OUT013,1987,High,Tier 3,Supermarket Type1,994.7052,0,18,Value present
FDP36,10.395,Regular,0.0,Baking Goods,51.4008,OUT018,2009,Medium,Tier 3,Supermarket Type2,556.6088,1,10,Value present
FDO10,13.65,Regular,0.012741089,Snack Foods,57.6588,OUT013,1987,High,Tier 3,Supermarket Type1,343.5528,1,5,Value present
FDP10,,Low Fat,0.127469857,Snack Foods,107.7622,OUT027,1985,Medium,Tier 3,Supermarket Type3,4022.7636,0,37,Value present
FDH17,16.2,Regular,0.016687114,Frozen Foods,96.9726,OUT045,2002,,Tier 2,Supermarket Type1,1076.5986,1,11,no Value
FDU28,19.2,Regular,0.09444959,Frozen Foods,187.8214,OUT017,2007,,Tier 2,Supermarket Type1,4710.535,1,25,no Value


In [0]:
@pandas_udf(df3.schema, PandasUDFType.GROUPED_MAP)
def Variance(v):
    return v.assign(Item_Weight=(12.85 - v.Item_Weight)**2)

df33 = df3.groupby('Item_Type').apply(Variance)



In [0]:
df33.select(['Item_Identifier','Item_Weight']).display()

Item_Identifier,Item_Weight
FDP36,6.027025
FDW12,
FDC37,
FDL12,9.0
FDL12,9.0
FDN48,
FDR12,
FDA47,5.522499999999998
FDG12,38.626225
FDB36,54.538225


In [0]:
df3_spark_pandas = df3.to_pandas_on_spark()

# https://databricks.com/blog/2021/10/04/pandas-api-on-upcoming-apache-spark-3-2.html

In [0]:
def outliet_size_update(x):
  if x == 'Small':
    return 1
  elif x == ' Medium':
    return 2
  elif x == 'High':
    return 3
  else:
    return 0
  
df3_spark_pandas['Outlet_categorical'] = df3_spark_pandas['Outlet_Size'].apply(outliet_size_update)  

In [0]:
df_pandas['Outlet_categorical'] = df_pandas['Outlet_Size'].apply(outliet_size_update)

In [0]:
import databricks.koalas as ks
kdf = ks.from_pandas(df_pandas)

In [0]:
kdf['Outlet_categorical'] = kdf['Outlet_Size'].apply(outliet_size_update)  