In [1]:
import findspark

findspark.init()

In [2]:
import os
import sys

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = 'jupyter'
os.environ['PYSPARK_DRIVER_PYTHON_OPTS'] = 'notebook' # Add HADOOP_HOME to PATH VARIABLE
os.environ["HADOOP_HOME"] = "D:\\HADOOP\\hadoop-3.4.0-win10-x64\\"
os.environ["PATH"] += os.pathsep + "%JAVA_HOME%\\bin" + os.pathsep + "D:\\HADOOP\\hadoop-3.4.0-win10-x64\\bin"

In [3]:
import pandas as pd
from pyspark.sql.session import SparkSession
from pyspark.conf import SparkConf
from pyspark.sql.functions import when
from pyspark.sql.functions import col
from pyspark.sql.functions import to_date
from pyspark.sql.functions import year
from pyspark.sql.functions import month

conf = SparkConf() \
    .set("spark.sql.adaptive.enabled", True) \
    .set("spark.driver.memory", "20g") \
    .set("spark.executor.memory", "20g") \
    .set("spark.sql.execution.arrow.pyspark.enabled", True) \
    .set('spark.sql.repl.eagerEval.enabled', True) \
    .set("spark.shuffle.consolidateFiles", True) \
    .set("spark.driver.maxResultSize", "20g")

spark = SparkSession\
    .builder \
    .appName('Monthly Persona')\
    .config(conf=conf)\
    .getOrCreate()

In [4]:
spark

In [5]:
from pyspark.sql import functions as F

In [6]:
# 2023
df = spark.read.parquet(r"D:\Users\TEST\2023_raw_transaction_with_article.parquet\part-00000-d571a0ee-d9db-4a2c-8716-73f7e9a59db0-c000.snappy.parquet")

In [7]:
df.printSchema()

root
 |-- BILLING_DATE: date (nullable = true)
 |-- TICKET_NO: string (nullable = true)
 |-- STORE: string (nullable = true)
 |-- STORE_ZONE: string (nullable = true)
 |-- STORE_ZONE_GRP: string (nullable = true)
 |-- ARTICLE: integer (nullable = true)
 |-- MC: string (nullable = true)
 |-- MCH2: string (nullable = true)
 |-- MCH3: string (nullable = true)
 |-- BRAND: string (nullable = true)
 |-- SALES_QTY: double (nullable = true)
 |-- APPROXIMATE_SALES: double (nullable = true)
 |-- MAIN_IDENTIFIER_NO: string (nullable = true)
 |-- APPLY_DATE: string (nullable = true)
 |-- MEMBER_STATUS: string (nullable = true)
 |-- TENDER_ID: string (nullable = true)
 |-- CREDIT_CARD_PROVIDER: string (nullable = true)
 |-- Tender_Type6: string (nullable = true)
 |-- Tender_Type2: string (nullable = true)
 |-- Tender_Type: string (nullable = true)
 |-- MST_TENDER_TYPE: string (nullable = true)
 |-- TIER_NAME: string (nullable = true)
 |-- HOMECARD_PROCARD: string (nullable = true)
 |-- BILLING_YEAR

In [9]:
df2 = df.filter(df["CUST_ID"] != 'NULL')

In [10]:
construction = ['BM','DW','HT','HW','PA','PB','PT','BR','FC','LT','DH','ET']

In [11]:
technician = df2.filter(df2["MCH3"].isin(construction))

In [12]:
use_col = ['MCH3', 'MCH2', 'MC', 'ARTICLE','STORE_GROUP']

In [13]:
technician_pandas = technician.groupBy(*use_col).agg(F.sum(F.col('SALES_QTY')).alias('Sum_QTY'),
                          F.sum(F.col('APPROXIMATE_SALES')).alias('Sum_Sales'),
                          F.countDistinct(F.col('MAIN_IDENTIFIER_NO')).alias('Member'),
                          F.countDistinct(F.col('TICKET_NO')).alias('Transaction')
                          ).toPandas()

In [None]:
technician_pandas.head()

Unnamed: 0,MCH3,MCH2,MC,ARTICLE,STORE_GROUP,Sum_QTY,Sum_Sales,Member,Transaction
0,BM,BM01,BM010202,1040813,ONLINE,59.0,138650.0,14,19
1,BM,BM01,BM010401,1181554,ONLINE,24.0,23830.0,10,14
2,BM,BM02,BM020101,1127245,MEGAHOME,11202.0,964039.150781,103,173
3,BM,BM02,BM020101,1205598,HOMEPRO,506.0,58051.41,6,15
4,BM,BM02,BM020105,1205577,MEGAHOME,15367.0,730925.873539,274,394


In [14]:
technician_pandas['MCH3'].unique()

array(['BM', 'BR', 'DH', 'DW', 'ET', 'FC', 'HT', 'HW', 'LT', 'PA', 'PB',
       'PT'], dtype=object)

In [28]:
technician_pandas.to_csv("technician2.csv")