In [1]:
from pyspark.sql import SparkSession

# Create a Spark session
spark = SparkSession.builder\
    .appName("ECommerceDWH")\
    .config("spark.executor.memory", "4g")\
    .config("spark.hadoop.fs.defaultFS", "hdfs://localhost:9000")\
    .enableHiveSupport()\
    .getOrCreate()

# Print session info
print("Spark Session Created Successfully!")
spark

25/03/15 17:05:50 WARN Utils: Your hostname, a4ankan resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3)
25/03/15 17:05:50 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/15 17:05:51 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Spark Session Created Successfully!


In [2]:
!hadoop fs -ls /

Found 1 items
drwxr-xr-x   - vboxuser supergroup          0 2025-03-15 17:00 /retail_db


In [3]:
!ls -lh /home/vboxuser/Desktop/retailDB/retail_db/


total 44M
-rw-rw-r-- 1 vboxuser vboxuser 16K Mar 15 17:04 analysis.ipynb
-rw-r--r-- 1 vboxuser vboxuser 44M Sep 20  2019 data.csv
-rw-rw-r-- 1 vboxuser vboxuser 12K Mar 15 16:01 LICENSE
-rw-rw-r-- 1 vboxuser vboxuser  11 Mar 15 16:01 README.md


In [4]:
from pathlib import Path

data_path = Path("/home/vboxuser/Desktop/retailDB/retail_db/")
print("Files in directory:", list(data_path.glob("*")))





Files in directory: [PosixPath('/home/vboxuser/Desktop/retailDB/retail_db/.ipynb_checkpoints'), PosixPath('/home/vboxuser/Desktop/retailDB/retail_db/.git'), PosixPath('/home/vboxuser/Desktop/retailDB/retail_db/analysis.ipynb'), PosixPath('/home/vboxuser/Desktop/retailDB/retail_db/LICENSE'), PosixPath('/home/vboxuser/Desktop/retailDB/retail_db/README.md'), PosixPath('/home/vboxuser/Desktop/retailDB/retail_db/data.csv')]


In [6]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType, TimestampType

# Define schema
schema = StructType([
    StructField("InvoiceNo", StringType(), True),
    StructField("StockCode", StringType(), True),
    StructField("Description", StringType(), True),
    StructField("Quantity", IntegerType(), True),
    StructField("InvoiceDate", StringType(), True),  # Will convert later
    StructField("UnitPrice", DoubleType(), True),
    StructField("CustomerID", IntegerType(), True),
    StructField("Country", StringType(), True)
])
df = spark.read.format("csv") \
    .option("header", True) \
    .schema(schema) \
    .load(f"file://{data_path}/data.csv")

df.show(5)

df.printSchema()



+---------+---------+--------------------+--------+--------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|   InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+--------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|12/1/2010 8:26|     2.55|     17850|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|12/1/2010 8:26|     2.75|     17850|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|
+---------+---------+--------------------+--------+--------------+---------+----------+--------------+
only showing top 5 rows

root
 |-- InvoiceNo: string (nullable = true)
 |

In [7]:
df = df.cache()
df.count()

                                                                                

541909

In [8]:
df.count()

541909

In [9]:
import datetime

# Get today's date in YYYY-MM-DD format
today_date = datetime.datetime.today().strftime('%Y-%m-%d')
today_date


'2025-03-15'

In [10]:
df.write.format("parquet").mode("overwrite").save(f"retail_db/{today_date}/raw.parquet")

                                                                                

In [11]:
!hadoop fs -ls retail_db/{today_date}/

Found 1 items
drwxr-xr-x   - vboxuser supergroup          0 2025-03-15 17:07 retail_db/2025-03-15/raw.parquet
