In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.context import SparkContext
import warnings
from pyspark.sql.types import StructField, StructType, IntegerType, DateType, StringType, TimestampType, DoubleType, LongType
import pyspark.sql.functions as F
from pyspark.sql import DataFrame as SparkDataFrame
from datetime import datetime
import random
warnings.filterwarnings("ignore")

In [2]:
# connecting to GCS
credentials_loc = "/home/ezzaldin/.gc/online-store-382421-413f8a966a1f.json"
conf = SparkConf().setMaster("local[*]").setAppName("test")\
                  .set("spark.jars", "/home/ezzaldin/lib/gcs-connector-hadoop3-2.2.5.jar")\
                  .set("spark.hadoop.google.cloud.auth.service.account.enable", "true") \
                  .set("spark.hadoop.google.cloud.auth.service.account.json.keyfile", credentials_loc)
sc = SparkContext(conf = conf)
hadoop_conf = sc._jsc.hadoopConfiguration()
hadoop_conf.set("fs.AbstractFileSystem.gs.impl",  "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
hadoop_conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
hadoop_conf.set("fs.gs.auth.service.account.json.keyfile", credentials_loc)
hadoop_conf.set("fs.gs.auth.service.account.enable", "true")
spark = SparkSession.builder.config(conf=sc.getConf()).getOrCreate()

23/04/02 20:42:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [3]:
date_str_formula = datetime.now().strftime("%Y%m%d")
orders = spark.read.option("header", "true").parquet(f"gs://store_data_lake_online-store-382421/raw/orders{date_str_formula}.parquet.snappy")
customers = spark.read.option("header", "true").parquet(f"gs://store_data_lake_online-store-382421/raw/customers{date_str_formula}.parquet.snappy")
items = spark.read.option("header", "true").parquet(f"gs://store_data_lake_online-store-382421/raw/items{date_str_formula}.parquet.snappy")

                                                                                

In [4]:
orders.show(10)

[Stage 3:>                                                          (0 + 1) / 1]

+-----------+-------+----------+
|customer_id|   item|created_at|
+-----------+-------+----------+
|     422152|weights|2023-03-31|
|     514911|   bags|2023-03-30|
|     469441| laptop|2023-03-30|
|     110382| laptop|2023-03-30|
|     246744|   food|2023-03-30|
|     845726|weights|2023-03-30|
|     431718|   bags|2023-03-30|
|     561885|  chair|2023-03-31|
|     402350| laptop|2023-03-31|
|     537215|weights|2023-03-30|
+-----------+-------+----------+
only showing top 10 rows



                                                                                

In [5]:
customers.show(5)

+------+-------------+-------+--------------------+------+--------------------+------------------+------------------+-------------------------+
|    id|Date of Birth|country|             address|gender|credit_card_provider|credit_card_number|credit_card_expire|credit_card_security_code|
+------+-------------+-------+--------------------+------+--------------------+------------------+------------------+-------------------------+
|297189|   1985-12-03|     UK|31655 Caleb Walk\...|     F|       VISA 16 digit|    30480338025065|             05/25|                      621|
|606033|   1998-10-09|    FRA|50901 Kyle Crossr...|     F|       VISA 16 digit|  4859114084261596|             01/25|                      463|
|161625|   1972-10-04|     CH|6414 Dougherty Ov...|     M|    American Express|  3517011750725845|             03/25|                      260|
|278521|   1990-07-29|     CH|604 Cole Track\nL...|     M|       VISA 19 digit|  4390619341561154|             05/31|                   

In [6]:
items.show(5)

+------+-----+--------------------+-------------+--------------+------------------+
|  item|price|         description|discount_flag|last_day_sales|    last_day_score|
+------+-----+--------------------+-------------+--------------+------------------+
| chair|   16|Memory without li...|            Y|           452|2.9942648454315397|
|   car|   24|Those hour loss o...|            Y|           379|2.3326004004187446|
|   toy|   42|State perhaps far...|            N|           642| 3.541021170769279|
|laptop|   10|Floor half inform...|            N|           514| 2.946138887355685|
|   box|   25|Sing represent it...|            N|           325|2.9832246550925223|
+------+-----+--------------------+-------------+--------------+------------------+
only showing top 5 rows



In [7]:
orders.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- item: string (nullable = true)
 |-- created_at: string (nullable = true)



In [8]:
customers.printSchema()

root
 |-- id: string (nullable = true)
 |-- Date of Birth: string (nullable = true)
 |-- country: string (nullable = true)
 |-- address: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- credit_card_provider: string (nullable = true)
 |-- credit_card_number: string (nullable = true)
 |-- credit_card_expire: string (nullable = true)
 |-- credit_card_security_code: string (nullable = true)



In [9]:
items.printSchema()

root
 |-- item: string (nullable = true)
 |-- price: string (nullable = true)
 |-- description: string (nullable = true)
 |-- discount_flag: string (nullable = true)
 |-- last_day_sales: string (nullable = true)
 |-- last_day_score: string (nullable = true)



In [10]:
# adjust the items schema.
# first remove last_day_sales column as it is incorrect information.
# replace this by the number of orders created in orders dataframe.
# create unique identifier for each item.
pre_items = items.withColumn("item", items['item'].cast(StringType())) \
                 .withColumn("price", items['price'].cast(IntegerType())) \
                 .withColumn("description", items['description'].cast(StringType())) \
                 .withColumn("discount_flag", items['discount_flag'].cast(StringType())) \
                 .withColumn("last_day_score", items['last_day_score'].cast(DoubleType()))
pre_items.printSchema()

root
 |-- item: string (nullable = true)
 |-- price: integer (nullable = true)
 |-- description: string (nullable = true)
 |-- discount_flag: string (nullable = true)
 |-- last_day_sales: string (nullable = true)
 |-- last_day_score: double (nullable = true)



In [13]:
pre_orders = orders.withColumn("customer_id", orders['customer_id'].cast(IntegerType())) \
                   .withColumn("item", orders['item'].cast(StringType())) \
                   .withColumn("created_at", orders['created_at'].cast(DateType()))
pre_orders.printSchema()

root
 |-- customer_id: integer (nullable = true)
 |-- item: string (nullable = true)
 |-- created_at: date (nullable = true)



In [14]:
pre_orders.show(10)

+-----------+-------+----------+
|customer_id|   item|created_at|
+-----------+-------+----------+
|     422152|weights|2023-03-31|
|     514911|   bags|2023-03-30|
|     469441| laptop|2023-03-30|
|     110382| laptop|2023-03-30|
|     246744|   food|2023-03-30|
|     845726|weights|2023-03-30|
|     431718|   bags|2023-03-30|
|     561885|  chair|2023-03-31|
|     402350| laptop|2023-03-31|
|     537215|weights|2023-03-30|
+-----------+-------+----------+
only showing top 10 rows



In [24]:
pre_customers = customers.withColumn("id", customers['id'].cast(IntegerType())) \
                         .withColumn("Date of Birth", customers['Date of Birth'].cast(DateType())) \
                         .withColumn("country", customers['country'].cast(StringType())) \
                         .withColumn("address", customers['address'].cast(StringType())) \
                         .withColumn("gender", customers['gender'].cast(StringType())) \
                         .withColumn("credit_card_provider", customers['credit_card_provider'].cast(StringType())) \
                         .withColumn("credit_card_number", customers['credit_card_number'].cast(StringType())) \
                         .withColumn("credit_card_expire", customers['credit_card_expire'].cast(StringType())) \
                         .withColumn("credit_card_security_code", customers['credit_card_security_code'].cast(IntegerType())) \
                         .withColumnRenamed("Date of Birth", "date_of_birth")
pre_customers.printSchema()

root
 |-- id: integer (nullable = true)
 |-- date_of_birth: date (nullable = true)
 |-- country: string (nullable = true)
 |-- address: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- credit_card_provider: string (nullable = true)
 |-- credit_card_number: string (nullable = true)
 |-- credit_card_expire: string (nullable = true)
 |-- credit_card_security_code: integer (nullable = true)



In [25]:
pre_customers.show(5)

+------+-------------+-------+--------------------+------+--------------------+------------------+------------------+-------------------------+
|    id|date_of_birth|country|             address|gender|credit_card_provider|credit_card_number|credit_card_expire|credit_card_security_code|
+------+-------------+-------+--------------------+------+--------------------+------------------+------------------+-------------------------+
|297189|   1985-12-03|     UK|31655 Caleb Walk\...|     F|       VISA 16 digit|    30480338025065|             05/25|                      621|
|606033|   1998-10-09|    FRA|50901 Kyle Crossr...|     F|       VISA 16 digit|  4859114084261596|             01/25|                      463|
|161625|   1972-10-04|     CH|6414 Dougherty Ov...|     M|    American Express|  3517011750725845|             03/25|                      260|
|278521|   1990-07-29|     CH|604 Cole Track\nL...|     M|       VISA 19 digit|  4390619341561154|             05/31|                   

In [18]:
# create a user defined function to create a unique code.
def item_code(item):
    code = ""
    for idx, val in enumerate(item):
        if idx == 0 or idx == len(item)-1:
            code+=str(ord(val))
        else:
            code+=val
    return code

In [19]:
item_code("box")

'98o120'

In [20]:
item_codes_maker = F.udf(item_code, returnType = StringType())
pre_items = items.withColumn("iid", item_codes_maker(items["item"]))
pre_items.show(10)

[Stage 10:>                                                         (0 + 1) / 1]

+-------+-----+--------------------+-------------+--------------+------------------+-----------+
|   item|price|         description|discount_flag|last_day_sales|    last_day_score|        iid|
+-------+-----+--------------------+-------------+--------------+------------------+-----------+
|  chair|   16|Memory without li...|            Y|           452|2.9942648454315397|   99hai114|
|    car|   24|Those hour loss o...|            Y|           379|2.3326004004187446|     99a114|
|    toy|   42|State perhaps far...|            N|           642| 3.541021170769279|    116o121|
| laptop|   10|Floor half inform...|            N|           514| 2.946138887355685| 108apto112|
|    box|   25|Sing represent it...|            N|           325|2.9832246550925223|     98o120|
|   food|   34|Child beat stage ...|            Y|           146| 2.096610210238468|   102oo100|
|  shirt|   96|Central science r...|            N|           931|2.1078141466892495|  115hir116|
|weights|   26|Service near re

                                                                                

In [26]:
# replace the item name in orders by item id.
pre_orders = pre_orders.join(pre_items, pre_orders["item"] == pre_items["item"], "inner") \
                       .select(["iid", "customer_id", "created_at"])
pre_orders.show(10)

+-----------+-----------+----------+
|        iid|customer_id|created_at|
+-----------+-----------+----------+
|119eight115|     422152|2023-03-31|
|    98ag115|     514911|2023-03-30|
| 108apto112|     469441|2023-03-30|
| 108apto112|     110382|2023-03-30|
|   102oo100|     246744|2023-03-30|
|119eight115|     845726|2023-03-30|
|    98ag115|     431718|2023-03-30|
|   99hai114|     561885|2023-03-31|
| 108apto112|     402350|2023-03-31|
|119eight115|     537215|2023-03-30|
+-----------+-----------+----------+
only showing top 10 rows

