In [1]:
import pyspark
import os
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession

conf = (SparkConf()
       .set("spark.driver.bindAddress", "0.0.0.0")\
       .set("spark.submit.deployMode", "client")\
       .set("spark.kubernetes.driver.pod.name", os.getenv("MY_POD_NAME"))\
       .set("spark.kubernetes.authenticate.subdmission.caCertFile", "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt")\
       .set("spark.kubernetes.authenticate.submission.oauthTokenFile", "/var/run/secrets/kubernetes.io/serviceaccount/token")\
       .set("spark.kubernetes.authenticate.driver.serviceAccountName", "spark-driver")\
       .set("spark.kubernetes.namespace", os.getenv("MY_POD_NAMESPACE"))\
       .set("spark.executor.instances", "1")\
       .set("spark.dynamicAllocation.maxExecutors", "2")\
       .set("spark.dynamicAllocation.enabled", "true")\
       .set("spark.dynamicAllocation.shuffleTracking.enabled", "true")\
       .set("spark.driver.host", "spark-driver")\
       .set("spark.driver.port", "20020")\
       .set("spark.kubernetes.executor.request.cores", "1")\
       .set("spark.kubernetes.executor.limit.cores", "2")\
       .set("spark.executor.memory", "12000m")\
       .set("spark.kubernetes.container.image", "klovercloud/airflow-spark-k8s-driver:3.1.2")\
       .set("spark.kubernetes.container.image.pullPolicy", "IfNotPresent")\
       .set("spark.sql.extensions","org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")\
       .set("spark.sql.catalog.spark_catalog","org.apache.iceberg.spark.SparkSessionCatalog")\
       .set("spark.sql.catalog.spark_catalog.type","hive")\
       .set("spark.hadoop.hive.metastore.uris","thrift://hive-metastore.analytics-dev:9083")\
       .set("spark.hadoop.hive.metastore.warehouse.dir","s3a://bigdata-dev-cmfcknil/warehouse/")\
       .set("spark.hadoop.fs.s3a.connection.ssl.enabled","true")\
       .set("spark.hadoop.fs.s3a.path.style.access","true")\
       .set("spark.hadoop.fs.s3a.endpoint","https://kcs3.bd-1.wpc.waltonelectronics.com")\
       .set("spark.hadoop.fs.s3a.access.key","30J9IFFY75NVQZVHG4D4")\
       .set("spark.hadoop.fs.s3a.secret.key","WMGk6kqq5erjqZjnqo0QItSQ2zuoNWEpHIphsTgR")\
       .set("spark.hadoop.fs.s3a.attempts.maximum","1")\
       .set("spark.hadoop.fs.s3a.connection.establish.timeout","500")\
       .set("spark.hadoop.datanucleus.autoCreateSchema","true")\
       .set("spark.hadoop.datanucleus.fixedDatastore","false"));


spark = SparkSession.builder \
            .master("k8s://https://kubernetes.default:443") \
            .appName("spark-test-app") \
            .config(conf=conf).enableHiveSupport().getOrCreate()

22/01/25 06:51:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/01/25 06:51:25 WARN ExecutorAllocationManager: Dynamic allocation without a shuffle service is an experimental feature.


In [2]:
spark.sql("SHOW DATABASES;").show()

+---------+
|namespace|
+---------+
|  default|
|      ebs|
|      pos|
|   pqspos|
|      reg|
|    tmpdb|
+---------+



In [3]:
dfM = spark.sql("select * from pos.pos___POS_SALE_MST;")   # Sales Invoice Master
dfD = spark.sql("select * from pos.pos___POS_SALE_DTLS;")
dfPD = spark.sql("select * from pos.pos___POS_PRODUCTS;")
dfC = spark.sql("select * from pos.pos___POS_CUSTOMERS;")
dfP = spark.sql("select * from pos.pos___POS_PLAZAS;")

22/01/25 06:52:14 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties


In [4]:
dfP = dfP.withColumnRenamed("ID","pid")
dfP = dfP.select("NAME","pid")

In [5]:
dfM = dfM.withColumnRenamed("ID","SID")
dfM = dfM.select("INVOICE_NO","CREATED","CREATED_BY","S_TYPE","IS_EMPLOYEE","CUSTOMER_ID","SID","PLAZA_ID")

In [6]:
dfC = dfC.withColumnRenamed("ID","CID")
dfC = dfC.withColumnRenamed("NAME","cname")
dfC = dfC.withColumnRenamed("MOBILE_PHONE","CUSTOMER_PHONE_NO")
dfC = dfC.withColumnRenamed("C_TYPE","CUSTOMER_TYPE")
dfC = dfC.select("cname","CUSTOMER_PHONE_NO","CUSTOMER_TYPE","CID")

In [7]:
dfPD = dfPD.withColumnRenamed("ID","prodid")

dfPD = dfPD.select("SEGMENT3","SEGMENT5","prodid")

In [8]:
dfD = dfD.select("RATE","QUANTITY","S_TOTAL","REBATE","G_TOTAL","SALE_MST_ID","PRODUCT_ID")

In [9]:
dfD= dfD.filter(dfD.RATE > 1500)

In [10]:
df1 = dfM.join(dfD,dfM.SID == dfD.SALE_MST_ID,"inner")

In [11]:

df2= df1.join(dfPD, dfPD.prodid== df1.PRODUCT_ID,"inner")

In [12]:
df3 = dfC.join(df2,dfC.CID == df2.CUSTOMER_ID,"inner")

In [13]:
df4 = dfP.join(df3,dfP.pid == df3.PLAZA_ID,"inner")

In [14]:
df4 = df4.withColumnRenamed("Name","PLAZA_NAME")

In [15]:
df4 = df4.drop("pid")

In [16]:
df4 = df4.withColumnRenamed("cname","CustomerName")

In [17]:
df4 = df4.drop("CID")

In [18]:
df4 = df4.filter(df4.IS_EMPLOYEE == False)

In [19]:
#df4 = df4.where('''Customer_Type in ('Retail','Hire')''')
df4 = df4.filter(df4.CUSTOMER_TYPE.contains('Retail')|df4.CUSTOMER_TYPE.contains('Hire'))

In [20]:
df4 = df4.filter(df4.CREATED_BY != "OPENING_HIRE")

In [21]:
import pyspark.sql.functions as funcs
df4 = df4.withColumn("date_of_registration",funcs.to_date(funcs.col("CREATED")))

In [22]:
df4 = df4.drop("CREATED")

In [23]:

## put it on top
from pyspark.sql.functions import *
from pyspark.sql.functions import when, lit, col
import pyspark.sql.functions as F
 
 
 
df4 = df4.filter(df4["date_of_registration"] >= lit('2016-01-01')) 

In [24]:
dfMaster = df4.cache()

In [25]:
dfMaster = dfMaster.withColumnRenamed("CUSTOMER_PHONE_NO","CustomerPhoneNumber")

In [26]:
df = dfMaster.cache()

## Removing null values of Customerphonenumber

In [27]:
df = df.filter(df.CustomerPhoneNumber.isNotNull())

# Remove zeros from customerphonenumber column

In [28]:
df = df.filter(df.CustomerPhoneNumber != "0")

## Remove leading plus minus sign from customerphonenumber column

In [29]:

#Remove leading plus minus sign
from pyspark.sql.functions import *

import pyspark.sql.functions as F

 

df = df.withColumn('CustomerPhoneNumber', F.regexp_replace('CustomerPhoneNumber', r'^[±]*', ''))

## Remove punctuation from customerphonenumber column

In [30]:
### Remove punctuation from column in pyspark
df = df.withColumn('CustomerPhoneNumber', F.regexp_replace('CustomerPhoneNumber', r'[^\w\s]', ''))

## Remove all special characters from CustomerPhoneNumber column

In [31]:
### Remove all special characters
df = df.withColumn('CustomerPhoneNumber', F.regexp_replace('CustomerPhoneNumber', r'\W', ''))


## Remove leading eight from CustomerPhoneNumber column

In [32]:
##Remove leading eight
df = df.withColumn('CustomerPhoneNumber', F.regexp_replace('CustomerPhoneNumber', r'^[8]*', ''))

# Dropping na values from customerphonenumber column

In [33]:
df = df.na.drop(subset=["CustomerPhoneNumber"])

## Remove english alphabets from customerphonenumber column

In [34]:
df = df.withColumn('CustomerPhoneNumber', F.regexp_replace('CustomerPhoneNumber', r'[A-Za-z]', ''))

## Removing empty strings

In [35]:
df = df.filter(df.CustomerPhoneNumber != "")
#df= df.cache()   # using cache

## Remove leadning plus sign from customerphonenumber column

In [36]:
#remove leading plus
df = df.withColumn('CustomerPhoneNumber', F.regexp_replace('CustomerPhoneNumber', r'^[+]*', ''))

## Remove | from customerphonenumber column

In [37]:
#remove |
df = df.withColumn('CustomerPhoneNumber', F.regexp_replace('CustomerPhoneNumber', r'|', ''))

## Remove / from customerphonenumber column

In [38]:
df = df.withColumn('CustomerPhoneNumber', F.regexp_replace('CustomerPhoneNumber', r'/', ''))

## Remove all non-english characters

In [39]:
#removing all non english characters
df = df.withColumn('CustomerPhoneNumber', F.regexp_replace('CustomerPhoneNumber', r'/[^\x00-\x7F]+/', ''))

## Removing empty strings after final data cleaning

In [40]:
df = df.filter(df.CustomerPhoneNumber != "")

# Removing all spaces from CustomerPhonenumber column

In [41]:
#removing all spaces
df = df.withColumn('CustomerPhoneNumber', F.regexp_replace('CustomerPhoneNumber', r'/\s/g', ''))




## Finding the length of customerphonenumber in customerphonenumber column

In [42]:
from pyspark.sql.functions import length
df = df.withColumn("length", F.length("CustomerPhoneNumber"))

## Remove all customers who do not have 11 digit number

In [43]:
df = df.filter(df.length == 11)

## Remove all customers whose phone numbers do not start with 01

In [44]:
df.createOrReplaceTempView("table1")

22/01/25 06:53:27 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


In [45]:
df10 = spark.sql("SELECT * FROM table1 where CustomerPhoneNumber like '01%'")

## Remove all customers whose phone numbers start with 010,011 and 012

In [46]:
df10.createOrReplaceTempView("table2")

In [47]:
df12 = spark.sql("SELECT * FROM table2 where CustomerPhoneNumber not like '010%'")

In [48]:
df12.createOrReplaceTempView("table3")

In [49]:
df13 = spark.sql("SELECT * FROM table3 where CustomerPhoneNumber not like '011%'")

In [50]:
df13.createOrReplaceTempView("table4")

In [51]:
df15 = spark.sql("SELECT * FROM table4 where CustomerPhoneNumber not like '012%'")

In [52]:
df = df15.cache()

# Remove all customers whose phonenumbers have last seven zero digits or eight zero digits.

In [53]:
df = df.withColumn('remaining_strings', substring('CustomerPhoneNumber', 4, 11))

In [54]:
df = df.filter(df.remaining_strings != "00000000")

In [55]:
df = df.withColumn('remaining_new', substring('remaining_strings', 1, 7))

In [56]:
df = df.filter(df.remaining_new != "0000000")

In [57]:
employeephonenumber = spark.read.load("s3a://bigdata-dev-cmfcknil/raw/init/posdb/pos/employee_list_final.csv",inferSchema='true', format='csv')

                                                                                

Let's rename the column _c0 with number

In [58]:
employeephonenumber = employeephonenumber.withColumnRenamed("_c0","Number")

Let's remove leading eight characters from employee customerphonenumber column

In [59]:
employeephonenumber = employeephonenumber.withColumn('Number', F.regexp_replace('Number', r'^[8]*', ''))

 Let's make employeephonenumber distinct and convert it into list.   

In [60]:
employeefinal = employeephonenumber.select("Number").distinct()

In [61]:
employee = employeefinal.select("Number").rdd.flatMap(lambda x: x).collect()

                                                                                

Let's remove phone number of employees.

In [62]:
df = df.filter(~df.CustomerPhoneNumber.isin(employee))

## Check whether employees are removed or not.

In [63]:
dfcheck = df.filter(df.CustomerPhoneNumber.isin(employee))

In [64]:
dfcheck.show(5)

+----------+------------+-------------------+-------------+----------+----------+------+-----------+-----------+---+--------+----+--------+-------+------+-------+-----------+----------+--------+--------+------+--------------------+------+-----------------+-------------+
|PLAZA_NAME|CustomerName|CustomerPhoneNumber|CUSTOMER_TYPE|INVOICE_NO|CREATED_BY|S_TYPE|IS_EMPLOYEE|CUSTOMER_ID|SID|PLAZA_ID|RATE|QUANTITY|S_TOTAL|REBATE|G_TOTAL|SALE_MST_ID|PRODUCT_ID|SEGMENT3|SEGMENT5|prodid|date_of_registration|length|remaining_strings|remaining_new|
+----------+------------+-------------------+-------------+----------+----------+------+-----------+-----------+---+--------+----+--------+-------+------+-------+-----------+----------+--------+--------+------+--------------------+------+-----------------+-------------+
+----------+------------+-------------------+-------------+----------+----------+------+-----------+-----------+---+--------+----+--------+-------+------+-------+-----------+----------+--

## Preprocessing completed


In [65]:
df = df.withColumnRenamed("PLAZA_NAME","Account_Name")

In [66]:
dfactive = spark.read.load("s3a://bigdata-dev-cmfcknil/raw/init/posdb/pos/ACTIVE_PLAZAS_UP.csv", header='true',inferSchema='true', format='csv')

                                                                                

In [67]:
dfnew = dfactive.join(df,dfactive.PLAZA_NAME == df.Account_Name,"inner")

In [68]:
df = dfnew.cache()

In [69]:
df = df.filter(col("SEGMENT3").isin('AIRCONDITIONER','REFRIGERATOR','3D TELEVISION','ALL IN ONE','AIRCONDITIONER-INDOOR','BEVERAGE COOLER','LAPTOP','DESKTOP PC','SERVER','FREEZER','MICROWAVE OVEN','LED TELEVISION','LED SMART TELEVISION','WASHING MACHINE'))

In [70]:
from pyspark.sql.functions import year

df = df.withColumn('Year',year(df.date_of_registration))




In [71]:
df = df.select("CustomerPhoneNumber","SEGMENT3","Year")

## Now convert laptop,desktop pc,server,all in one into computer

In [72]:
from pyspark.sql.functions import when

df = df.withColumn('SEGMENT3',
    F.when((df.SEGMENT3 == "LAPTOP") | (df.SEGMENT3 == 'DESKTOP PC') | (df.SEGMENT3 == 'SERVER') | (df.SEGMENT3 == 'ALL IN ONE'), "COMPUTER").otherwise(df.SEGMENT3))


# Now convert 3D Television into led smart television

In [73]:
df = df.withColumn('SEGMENT3',
    F.when((df.SEGMENT3 == "3D TELEVISION") , "LED SMART TELEVISION").otherwise(df.SEGMENT3))


# Now convert LCD TELEVISION into LED TELEVISION

In [74]:
df = df.withColumn('SEGMENT3',
    F.when((df.SEGMENT3 == "LCD TELEVISION") , "LED TELEVISION").otherwise(df.SEGMENT3))

# Now convert Beverage cooler into refrigerator

In [75]:
df = df.withColumn('SEGMENT3',
    F.when((df.SEGMENT3 == "BEVERAGE COOLER") , "REFRIGERATOR").otherwise(df.SEGMENT3))

# Now convert airconditioner-indoor into airconditioner

In [76]:
df = df.withColumn('SEGMENT3',
    F.when((df.SEGMENT3 == "AIRCONDITIONER-INDOOR") , "AIRCONDITIONER").otherwise(df.SEGMENT3))


In [77]:
dfselected = df.select("CustomerPhoneNumber","SEGMENT3","Year")

In [78]:
dfcountfinal = df.groupby('CustomerPhoneNumber','SEGMENT3').agg(funcs.expr('count(SEGMENT3)')\
                              .alias('productcount'))

In [79]:
dfcountfinal = dfcountfinal.filter(dfcountfinal.productcount < 5)

In [80]:
dfcountfinalaircon = dfcountfinal.filter(dfcountfinal.SEGMENT3 == "AIRCONDITIONER")

In [81]:
dfcountfinalaircon = dfcountfinalaircon.filter(dfcountfinalaircon.productcount <= 4)

In [82]:
dfcountfinalcomputer = dfcountfinal.filter(dfcountfinal.SEGMENT3 == "COMPUTER")

In [83]:
dfcountfinalcomputer = dfcountfinalcomputer.filter(dfcountfinalcomputer.productcount <= 3)

In [84]:
dfcountfinalfreezer = dfcountfinal.filter(dfcountfinal.SEGMENT3 == "FREEZER")

In [85]:
dfcountfinalfreezer = dfcountfinalfreezer.filter(dfcountfinalfreezer.productcount <= 3)

In [86]:
dfcountfinaltelevision = dfcountfinal.filter((dfcountfinal.SEGMENT3 == "LED TELEVISION") | (dfcountfinal.SEGMENT3 == "LED SMART TELEVISION"))

In [87]:
dfcountfinaltelevision = dfcountfinaltelevision.filter(dfcountfinaltelevision.productcount <= 4)

In [88]:
dfcountfinalmicrowaveoven = dfcountfinal.filter(dfcountfinal.SEGMENT3 == "MICROWAVE OVEN")

In [89]:
dfcountfinalmicrowaveoven = dfcountfinalmicrowaveoven.filter(dfcountfinalmicrowaveoven.productcount <= 4)

In [90]:
dfcountfinalrefrigerator = dfcountfinal.filter(dfcountfinal.SEGMENT3 == "REFRIGERATOR")

In [91]:
dfcountfinalrefrigerator = dfcountfinalrefrigerator.filter(dfcountfinalrefrigerator.productcount <= 4)

In [92]:
dfcountfinalwashingmachine = dfcountfinal.filter(dfcountfinal.SEGMENT3 == "WASHING MACHINE")

In [93]:
dfcountfinalwashingmachine = dfcountfinalwashingmachine.filter(dfcountfinalwashingmachine.productcount <= 3)

In [94]:
resultant = dfcountfinalaircon.union(dfcountfinalcomputer)

In [95]:
resultant1 = resultant.union(dfcountfinalfreezer)

In [96]:
resultant2 = resultant1.union(dfcountfinaltelevision)

In [97]:
resultant3 = resultant2.union(dfcountfinalmicrowaveoven)

In [98]:
resultant4 = resultant3.union(dfcountfinalrefrigerator)

In [99]:
resultant5 = resultant4.union(dfcountfinalwashingmachine)

In [100]:
resultant5 = resultant5.withColumnRenamed("CustomerPhoneNumber","Number")

In [101]:
dfselected = dfselected.withColumnRenamed("SEGMENT3","Product")

In [102]:
resultant5.registerTempTable("resultant5")
dfselected.registerTempTable("dfselected")



In [103]:
new = spark.sql("SELECT dfselected.CustomerPhoneNumber,resultant5.SEGMENT3,dfselected.Year,resultant5.productcount from resultant5 inner join dfselected on resultant5.Number = dfselected.CustomerPhoneNumber and resultant5.SEGMENT3 = dfselected.Product")

In [104]:
df = new.cache()

In [105]:
df = df.dropDuplicates()

In [106]:
forrex = df.cache()

In [107]:
forrex = forrex.drop("Year","productcount")

In [108]:
forrex = forrex.withColumn('AIRCONDITIONER',
    F.when((forrex.SEGMENT3 == "AIRCONDITIONER") , 1).otherwise(0))

In [109]:
forrex = forrex.withColumn('FREEZER',
    F.when((forrex.SEGMENT3 == "FREEZER") , 1).otherwise(0))

In [110]:
forrex = forrex.withColumn('REFRIGERATOR',
    F.when((forrex.SEGMENT3 == "REFRIGERATOR") , 1).otherwise(0))

In [111]:
forrex = forrex.withColumn('COMPUTER',
    F.when((forrex.SEGMENT3 == "COMPUTER") , 1).otherwise(0))

In [112]:
forrex = forrex.withColumn('LED TELEVISION',
    F.when((forrex.SEGMENT3 == "LED TELEVISION") , 1).otherwise(0))

In [113]:
forrex = forrex.withColumn('LED SMART TELEVISION',
    F.when((forrex.SEGMENT3 == "LED SMART TELEVISION") , 1).otherwise(0))

In [114]:
forrex = forrex.withColumn('WASHING MACHINE',
    F.when((forrex.SEGMENT3 == "WASHING MACHINE") , 1).otherwise(0))

In [115]:
forrex = forrex.withColumn('MICROWAVE OVEN',
    F.when((forrex.SEGMENT3 == "MICROWAVE OVEN") , 1).otherwise(0))

In [116]:
forrex = forrex.drop("SEGMENT3")

In [117]:
forrexcountaircon = forrex.select("CustomerPhoneNumber","AIRCONDITIONER")
forrexcountaircon = forrexcountaircon.filter(forrexcountaircon.AIRCONDITIONER != 0)
forrexcountaircon = forrexcountaircon.groupby('CustomerPhoneNumber').agg(funcs.expr('count(AIRCONDITIONER)')\
                              .alias('airconditioner_count'))

In [118]:
forrexcountaircon = forrexcountaircon.drop("AIRCONDITIONER")

In [119]:
forrexcountfreezer = forrex.select("CustomerPhoneNumber","FREEZER")
forrexcountfreezer = forrexcountfreezer.filter(forrexcountfreezer.FREEZER != 0)
forrexcountfreezer = forrexcountfreezer.groupby('CustomerPhoneNumber').agg(funcs.expr('count(FREEZER)')\
                              .alias('freezer_count'))

In [120]:
forrexcountfreezer = forrexcountfreezer.drop("FREEZER")

In [121]:
forrexcountrefrigerator = forrex.select("CustomerPhoneNumber","REFRIGERATOR")
forrexcountrefrigerator = forrexcountrefrigerator.filter(forrexcountrefrigerator.REFRIGERATOR != 0)
forrexcountrefrigerator = forrexcountrefrigerator.groupby('CustomerPhoneNumber').agg(funcs.expr('count(REFRIGERATOR)')\
                              .alias('refrigerator_count'))

In [122]:
forrexcountrefrigerator = forrexcountrefrigerator.drop("REFRIGERATOR")

In [123]:
forrexcountcomputer = forrex.select("CustomerPhoneNumber","COMPUTER")
forrexcountcomputer = forrexcountcomputer.filter(forrexcountcomputer.COMPUTER != 0)
forrexcountcomputer = forrexcountcomputer.groupby('CustomerPhoneNumber').agg(funcs.expr('count(COMPUTER)')\
                              .alias('computer_count'))

In [124]:
forrexcountcomputer = forrexcountcomputer.drop("COMPUTER")

In [125]:
forrexcountledtelevision = forrex.select("CustomerPhoneNumber","LED TELEVISION")
forrexcountledtelevision = forrexcountledtelevision.filter(forrexcountledtelevision["LED TELEVISION"] != 0)
forrexcountledtelevision = forrexcountledtelevision.groupby("CustomerPhoneNumber").agg(count("LED TELEVISION").alias("Ledtelevision_count"))

In [126]:
forrexcountledtelevision = forrexcountledtelevision.drop("LED TELEVISION")

In [127]:
forrexcountledsmarttelevision = forrex.select("CustomerPhoneNumber","LED SMART TELEVISION")
forrexcountledsmarttelevision = forrexcountledsmarttelevision.filter(forrexcountledsmarttelevision["LED SMART TELEVISION"] != 0)
forrexcountledsmarttelevision = forrexcountledsmarttelevision.groupby("CustomerPhoneNumber").agg(count("LED SMART TELEVISION").alias("Ledsmarttelevision_count"))

In [128]:
forrexcountledsmarttelevision = forrexcountledsmarttelevision.drop("LED SMART TELEVISION")

In [129]:
forrexcountwashingmachine = forrex.select("CustomerPhoneNumber","WASHING MACHINE")
forrexcountwashingmachine = forrexcountwashingmachine.filter(forrexcountwashingmachine["WASHING MACHINE"] != 0)
forrexcountwashingmachine = forrexcountwashingmachine.groupby("CustomerPhoneNumber").agg(count("WASHING MACHINE").alias("Washingmachine_count"))

In [130]:
forrexcountwashingmachine = forrexcountwashingmachine.drop("WASHING MACHINE")

In [131]:
forrexcountmicrowaveoven = forrex.select("CustomerPhoneNumber","MICROWAVE OVEN")
forrexcountmicrowaveoven = forrexcountmicrowaveoven.filter(forrexcountmicrowaveoven["MICROWAVE OVEN"] != 0)
forrexcountmicrowaveoven = forrexcountmicrowaveoven.groupby("CustomerPhoneNumber").agg(count("MICROWAVE OVEN").alias("Microwaveoven_count"))

In [132]:
forrexcountmicrowaveoven = forrexcountmicrowaveoven.drop("MICROWAVE OVEN")

In [133]:
result = forrexcountaircon.join(forrexcountfreezer,['CustomerPhoneNumber'],how='full')

In [134]:
result1 = result.join(forrexcountrefrigerator,['CustomerPhoneNumber'],how='full')

In [135]:
result2 = result1.join(forrexcountcomputer,['CustomerPhoneNumber'],how='full')

In [136]:
result3 = result2.join(forrexcountledtelevision,['CustomerPhoneNumber'],how='full')

In [137]:
result4 = result3.join(forrexcountledsmarttelevision,['CustomerPhoneNumber'],how='full')

In [138]:
result5 = result4.join(forrexcountwashingmachine,['CustomerPhoneNumber'],how='full')

In [139]:
result6 = result5.join(forrexcountmicrowaveoven,['CustomerPhoneNumber'],how='full')

In [140]:
result6 = result6.na.fill(value=0)


Now, we do not need customerphonenumber as we already counted the products. Also we need to draw forrex heat map showing the relationship between the products.

In [141]:
result6 = result6.dropDuplicates(["CustomerPhoneNumber"])

In [142]:
result6 = result6.drop("CustomerPhoneNumber")

In [143]:
result7 = result6.cache()

In [145]:
result7 = result7.withColumn('AIRCONDITIONER',F.when((result7.airconditioner_count >= 1) , 1).otherwise(0))


In [146]:
result7 = result7.withColumn('FREEZER',F.when((result7.freezer_count >= 1) , 1).otherwise(0))

In [147]:
result7 = result7.withColumn('REFRIGERATOR',F.when((result7.refrigerator_count >= 1) , 1).otherwise(0))

In [148]:
result7 = result7.withColumn('COMPUTER',F.when((result7.computer_count >= 1) , 1).otherwise(0))

22/01/25 07:07:32 WARN WatchConnectionManager: Exec Failure
java.io.EOFException
	at okio.RealBufferedSource.require(RealBufferedSource.java:61)
	at okio.RealBufferedSource.readByte(RealBufferedSource.java:74)
	at okhttp3.internal.ws.WebSocketReader.readHeader(WebSocketReader.java:117)
	at okhttp3.internal.ws.WebSocketReader.processNextFrame(WebSocketReader.java:101)
	at okhttp3.internal.ws.RealWebSocket.loopReader(RealWebSocket.java:274)
	at okhttp3.internal.ws.RealWebSocket$2.onResponse(RealWebSocket.java:214)
	at okhttp3.RealCall$AsyncCall.execute(RealCall.java:203)
	at okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
22/01/25 07:07:33 WARN ExecutorPodsWatchSnapshotSource: Kubernetes client has been closed (this is expected if the applica

In [149]:
result7 = result7.withColumn('LEDtelevision',F.when((result7.Ledtelevision_count >= 1) , 1).otherwise(0))

In [150]:
result7 = result7.withColumn('LEDsmarttelevision',F.when((result7.Ledsmarttelevision_count >= 1) , 1).otherwise(0))

In [151]:
result7 = result7.withColumn('Washingmachine',F.when((result7.Washingmachine_count >= 1) , 1).otherwise(0))

In [152]:
result7 = result7.withColumn('Microwaveoven',F.when((result7.Microwaveoven_count >= 1) , 1).otherwise(0))

In [154]:
result7 = result7.drop("airconditioner_count","freezer_count","refrigerator_count","computer_count","Ledtelevision_count","Ledsmarttelevision_count","Washingmachine_count","Microwaveoven_count")

In [155]:
result7 = result7.withColumn("sum", col("AIRCONDITIONER")+col("FREEZER")+col("REFRIGERATOR")+col("COMPUTER")+col("LEDtelevision")+col("LEDsmarttelevision")+col("Washingmachine")+col("Microwaveoven"))


In [157]:
result7 = result7.filter(result7.sum != 1)

In [158]:
result7 = result7.drop("sum")

In [160]:
forrextotalnore4 = result7.limit(183395)

forrextotalnore4.write.format("csv").save("s3a://bigdata-dev-cmfcknil/raw/init/posdb/pos/forrexheatmapemployeeremovedfinal7.csv",header = 'true')

#forrextotalnore.write.option("header",true).csv("s3a://bigdata-dev-cmfcknil/raw/init/posdb/pos/forrexheatmapemployeeremovedfinal3.csv")


22/01/25 08:26:29 WARN AbstractS3ACommitterFactory: Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.
22/01/25 08:26:29 WARN DAGScheduler: Broadcasting large task binary with size 1223.6 KiB
                                                                                