In [1]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

In [3]:
df = spark.read \
    .option("header", "true") \
    .csv ('dataset.csv')

df.printSchema()

root
 |-- accessed_date: string (nullable = true)
 |-- duration_(secs): string (nullable = true)
 |-- network_protocol: string (nullable = true)
 |-- ip: string (nullable = true)
 |-- bytes: string (nullable = true)
 |-- accessed_Ffom: string (nullable = true)
 |-- age: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- country: string (nullable = true)
 |-- membership: string (nullable = true)
 |-- language: string (nullable = true)
 |-- sales: string (nullable = true)
 |-- returned: string (nullable = true)
 |-- returned_amount: string (nullable = true)
 |-- pay_method: string (nullable = true)



In [18]:
from pyspark.sql import types
from pyspark.sql.functions import when
from pyspark.sql.functions import expr
from pyspark.sql.functions import initcap

schema = types.StructType([
    types.StructField('accessed_date', types.TimestampType(), True),
    types.StructField('duration_(secs)', types.IntegerType(), True),
    types.StructField('network_protocol', types.StringType(), True),
    types.StructField('ip', types.StringType(), True),
    types.StructField('bytes', types.IntegerType(), True),
    types.StructField('accessed_Ffom', types.StringType(), True),
    types.StructField('age', types.IntegerType(), True),
    types.StructField('gender', types.StringType(), True),
    types.StructField('country', types.StringType(), True),
    types.StructField('membership', types.StringType(), True),
    types.StructField('language', types.StringType(), True),
    types.StructField('sales', types.DoubleType(), True),
    types.StructField('returned', types.StringType(), True),
    types.StructField('returned_amount', types.DoubleType(), True),
    types.StructField('pay_method', types.StringType(), True)
])

df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv ('dataset.csv')

df =(
    df \
            .withColumnRenamed('duration_(secs)', 'duration_sec') \
            .withColumnRenamed('accessed_Ffom', 'accessed_from') \
            .withColumnRenamed('membership', 'account_type') \
            .withColumnRenamed('returned', 'refunded') \
            .withColumnRenamed('returned_amount', 'refunded_amount') \
            .withColumnRenamed('pay_method', 'payment_method')
)

df = df.withColumn(
    'payment_method',
    when(df['payment_method'] == 'Debit Card', 'MasterCard')
    .when(df['payment_method'] == 'Credit Card', 'Visa')
    .when(df['payment_method'] == 'Cash', 'PayPal')
    .otherwise('Other')
)

df = df.withColumn(
    'accessed_from',
    when(df['accessed_from'] == 'SafFRi', 'Safari')
    .when(df['accessed_from'] == 'Others', 'Other')
    .otherwise(df['accessed_from'])
)

df = df.withColumn(
    'network_protocol',
    when(df['network_protocol'] == 'HTTP  ', 'HTTP')
    .otherwise(df['network_protocol'])
)

df = df.withColumn(
    'gender',
    when(df['gender'] == 'Unknown', None)
    .otherwise(df['gender'])
)

df = df.withColumn("accessed_date", expr("make_timestamp(2025, month(accessed_date), day(accessed_date), hour(accessed_date), minute(accessed_date), second(accessed_date))"))

df = df.withColumn("language", initcap(df["language"]))

df.printSchema()

root
 |-- accessed_date: timestamp (nullable = true)
 |-- duration_sec: integer (nullable = true)
 |-- network_protocol: string (nullable = true)
 |-- ip: string (nullable = true)
 |-- bytes: integer (nullable = true)
 |-- accessed_from: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- country: string (nullable = true)
 |-- account_type: string (nullable = true)
 |-- language: string (nullable = true)
 |-- sales: double (nullable = true)
 |-- refunded: string (nullable = true)
 |-- refunded_amount: double (nullable = true)
 |-- payment_method: string (nullable = false)



In [17]:
df = df.repartition(10)

df.write.parquet('dataset_repartitioned_parquet')

df = spark.read.parquet('dataset_repartitioned_parquet')

df.show()

+-------------------+------------+----------------+---------------+-----+---------------+----+-------+-------+-------------+--------+------+--------+---------------+--------------+
|      accessed_date|duration_sec|network_protocol|             ip|bytes|  accessed_from| age| gender|country| account_type|language| sales|refunded|refunded_amount|payment_method|
+-------------------+------------+----------------+---------------+-----+---------------+----+-------+-------+-------------+--------+------+--------+---------------+--------------+
|2025-03-15 22:03:13|        2583|           TCP  |212.121.212.197| 1726|          Other|  32|   Male|     IN|       Normal| Italian|  30.3|      No|            0.0|         Other|
|2025-03-16 14:46:40|        2764|           TCP  |  176.31.228.78|  556|          Other|  27|   Male|     IT|       Normal|  French| 40.66|      No|            0.0|          Visa|
|2025-03-15 09:32:36|        3723|            HTTP|  86.31.109.178|  579|        IOS App|  24| 