In [1]:
# Intialization
import os
import sys
os.environ["SPARK_HOME"] = "/home/talentum/spark"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
# In below two lines, use /usr/bin/python2.7 if you want to use Python 2
os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3.6" 
os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/bin/python3"
sys.path.insert(0, os.environ["PYLIB"] +"/py4j-0.10.7-src.zip")
sys.path.insert(0, os.environ["PYLIB"] +"/pyspark.zip")

# NOTE: Whichever package you want mention here.
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.6.0 pyspark-shell' 
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-avro_2.11:2.4.0 pyspark-shell'
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.6.0,org.apache.spark:spark-avro_2.11:2.4.3 pyspark-shell'
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.6.0,org.apache.spark:spark-avro_2.11:2.4.0 pyspark-shell'

In [2]:
#Entrypoint 2.x
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Spark SQL basic example").enableHiveSupport().getOrCreate()

# On yarn:
# spark = SparkSession.builder.appName("Spark SQL basic example").enableHiveSupport().master("yarn").getOrCreate()
# specify .master("yarn")

sc = spark.sparkContext

In [3]:
from pyspark.sql.functions import *

In [4]:
file_path="file:///home/talentum/shared/data/e-shop_clothing_2008.csv"
#Loading Input File
inputDF=spark.read.csv(file_path,header=True,inferSchema=True)
inputDF.show(5)

+----+-----+---+-----+-------+----------+----------------------+-----------------------+------+--------+-----------------+-----+-------+----+
|year|month|day|order|country|session ID|page 1 (main category)|page 2 (clothing model)|colour|location|model photography|price|price 2|page|
+----+-----+---+-----+-------+----------+----------------------+-----------------------+------+--------+-----------------+-----+-------+----+
|2008|    4|  1|    1|     29|         1|                     1|                    A13|     1|       5|                1|   28|      2|   1|
|2008|    4|  1|    2|     29|         1|                     1|                    A16|     1|       6|                1|   33|      2|   1|
|2008|    4|  1|    3|     29|         1|                     2|                     B4|    10|       2|                1|   52|      1|   1|
|2008|    4|  1|    4|     29|         1|                     2|                    B17|     6|       6|                2|   38|      2|   1|
|2008|

In [5]:
inputDF=inputDF.withColumnRenamed('page 1 (main category)','Product_Category')
inputDF=inputDF.withColumnRenamed('page 2 (clothing model)','Clothing_Model')
inputDF=inputDF.withColumnRenamed('session ID','session_ID')
inputDF=inputDF.withColumnRenamed('model photography','model_photography')
inputDF=inputDF.withColumnRenamed('price 2','price_2')

inputDF.columns

['year',
 'month',
 'day',
 'order',
 'country',
 'session_ID',
 'Product_Category',
 'Clothing_Model',
 'colour',
 'location',
 'model_photography',
 'price',
 'price_2',
 'page']

In [6]:
for column in inputDF.columns:
    print(column," : ",inputDF.select(column).distinct().count())

year  :  1
month  :  5
day  :  31
order  :  195
country  :  47
session_ID  :  24026
Product_Category  :  4
Clothing_Model  :  217
colour  :  14
location  :  6
model_photography  :  2
price  :  20
price_2  :  2
page  :  5


In [7]:
#Analysing Features
#year,month,day -> No use as it only contain value of smaller duration
#order -> USeful 
#Country -> Useful (main category for classification)
#session ID -> Useful
# Product_Category,Clothing_Model,colour,location,model photography,price(dollar),price2,page -> Useful

In [8]:
#Droping the columns 
inputDF1=inputDF.drop('year','month','day')
inputDF1.show(5)

+-----+-------+----------+----------------+--------------+------+--------+-----------------+-----+-------+----+
|order|country|session_ID|Product_Category|Clothing_Model|colour|location|model_photography|price|price_2|page|
+-----+-------+----------+----------------+--------------+------+--------+-----------------+-----+-------+----+
|    1|     29|         1|               1|           A13|     1|       5|                1|   28|      2|   1|
|    2|     29|         1|               1|           A16|     1|       6|                1|   33|      2|   1|
|    3|     29|         1|               2|            B4|    10|       2|                1|   52|      1|   1|
|    4|     29|         1|               2|           B17|     6|       6|                2|   38|      2|   1|
|    5|     29|         1|               2|            B8|     4|       3|                2|   52|      1|   1|
+-----+-------+----------+----------------+--------------+------+--------+-----------------+-----+------

In [9]:
#Checking for NULL values
for column in inputDF1.columns:
    print(column,":",inputDF1.filter(col(column).isNull()).count())

order : 0
country : 0
session_ID : 0
Product_Category : 0
Clothing_Model : 0
colour : 0
location : 0
model_photography : 0
price : 0
price_2 : 0
page : 0


In [10]:
#Label encoding on col= Clothing_Model
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer(inputCol="Clothing_Model", outputCol="Clothing_Model_label") 
indexed_df = indexer.fit(inputDF1).transform(inputDF1)
indexed_df=indexed_df.drop("Clothing_Model").withColumnRenamed("Clothing_Model_label","Clothing_Model")
print("The output dataframe is:")
indexed_df.show(5)

The output dataframe is:
+-----+-------+----------+----------------+------+--------+-----------------+-----+-------+----+--------------+
|order|country|session_ID|Product_Category|colour|location|model_photography|price|price_2|page|Clothing_Model|
+-----+-------+----------+----------------+------+--------+-----------------+-----+-------+----+--------------+
|    1|     29|         1|               1|     1|       5|                1|   28|      2|   1|          19.0|
|    2|     29|         1|               1|     1|       6|                1|   33|      2|   1|          34.0|
|    3|     29|         1|               2|    10|       2|                1|   52|      1|   1|           0.0|
|    4|     29|         1|               2|     6|       6|                2|   38|      2|   1|          28.0|
|    5|     29|         1|               2|     4|       3|                2|   52|      1|   1|         121.0|
+-----+-------+----------+----------------+------+--------+-----------------+--

In [11]:
from pyspark.sql.functions import col, exp
def iqr_outlier_treatment(dataframe, columns, factor=1.5):
    for column in columns:
        # Calculate Q1, Q3, and IQR
        quantiles = dataframe.approxQuantile(column, [0.25, 0.75],0.01)
        q1, q3 = quantiles[0], quantiles[1]
        iqr = q3 - q1

        # Define the upper and lower bounds for outliers
        lower_bound = q1 - factor * iqr
        upper_bound = q3 + factor * iqr

        # Filter outliers and update the DataFrame
        dataframe = dataframe.filter((col(column) >= lower_bound) & (col(column) <= upper_bound))

    return dataframe

In [12]:
Continuous_cols = ['order','price'] #order and price are only continuous columns
df_outlier_treatment = iqr_outlier_treatment(indexed_df, Continuous_cols, factor=1.5)
df_outlier_treatment.show(5)

+-----+-------+----------+----------------+------+--------+-----------------+-----+-------+----+--------------+
|order|country|session_ID|Product_Category|colour|location|model_photography|price|price_2|page|Clothing_Model|
+-----+-------+----------+----------------+------+--------+-----------------+-----+-------+----+--------------+
|    1|     29|         1|               1|     1|       5|                1|   28|      2|   1|          19.0|
|    2|     29|         1|               1|     1|       6|                1|   33|      2|   1|          34.0|
|    3|     29|         1|               2|    10|       2|                1|   52|      1|   1|           0.0|
|    4|     29|         1|               2|     6|       6|                2|   38|      2|   1|          28.0|
|    5|     29|         1|               2|     4|       3|                2|   52|      1|   1|         121.0|
+-----+-------+----------+----------------+------+--------+-----------------+-----+-------+----+--------

In [13]:
df_outlier_treatment.count() 
#13,365 rows dropped as result of outlier detection in order and price columns

152109

In [14]:
#User-Session-level data
inputDF2=inputDF1.groupBy('session_ID').count()
#Data Engineering
df_inner = inputDF1.join(inputDF2, on='session_ID', how='inner')
df_inner.columns

['session_ID',
 'order',
 'country',
 'Product_Category',
 'Clothing_Model',
 'colour',
 'location',
 'model_photography',
 'price',
 'price_2',
 'page',
 'count']

In [16]:
df_inner = df_inner.coalesce(1)
df_inner.write.csv("file:///home/talentum/preprocessed_data")