In [7]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, StringType, IntegerType, DateType, DoubleType
from pyspark.sql import functions as F
import psycopg2, uuid

In [54]:
spark = SparkSession.builder.appName("Creating_Tables")\
    .getOrCreate()


schema = StructType([
    StructField("customer_id", StringType(), True),
    StructField("customer_first_name", StringType(), True),
    StructField("customer_last_name", StringType(), True),
    StructField("category_name", StringType(), True),
    StructField("product_name", StringType(), True),
    StructField("customer_segment", StringType(), True),
    StructField("customer_city", StringType(), True),
    StructField("customer_state", StringType(), True),
    StructField("customer_country", StringType(), True),
    StructField("customer_region", StringType(), True),
    StructField("delivery_status", StringType(), True),
    StructField("order_date", StringType(), True),
    StructField("order_id", StringType(), True),
    StructField("ship_date", StringType(), True),
    StructField("shipping_type", StringType(), True),
    StructField("days_for_shipment_scheduled", IntegerType(), True),
    StructField("days_for_shipment_real", IntegerType(), True),
    StructField("order_item_discount", DoubleType(), True),
    StructField("sales_per_order", DoubleType(), True),
    StructField("order_quantity", IntegerType(), True),
    StructField("profit_per_order", DoubleType(), True)
])

In [66]:
df = spark.read.option("header",True).schema(schema).csv("../data/ecommerce/Ecommerce_data.csv")

In [67]:
order = df.select(
    "order_id",
    "order_date",
    "delivery_status",
    "shipping_type",
    "ship_date",
    "days_for_shipment_scheduled",
    "days_for_shipment_real"
)

order.show(10)
# df.printSchema()

+------------+----------+-----------------+--------------+----------+---------------------------+----------------------+
|    order_id|order_date|  delivery_status| shipping_type| ship_date|days_for_shipment_scheduled|days_for_shipment_real|
+------------+----------+-----------------+--------------+----------+---------------------------+----------------------+
|O_ID_3001072| 11/5/2022| Shipping on time|  Second Class| 11/7/2022|                          2|                     2|
|O_ID_3009170|20-06-2022|Shipping canceled|  Second Class|23-06-2022|                          2|                     3|
|O_ID_3047567|25-06-2022|    Late delivery|Standard Class|30-06-2022|                          4|                     5|
|O_ID_3060575| 10/6/2022|    Late delivery|  Second Class|10/10/2022|                          2|                     4|
|O_ID_3064311|  2/5/2022|    Late delivery|   First Class|  8/1/2022|                          1|                     2|
|O_ID_3074984|24-06-2022| Shippi

In [81]:
# df.printSchema()
# df = df.withColumn("order_date", F.to_date("order_date", "yyyy-MM-dd"))
# df = df.withColumn("ship_date", F.to_date("ship_date", "yyyy-MM-dd"))


df = df.withColumn("order_date_p1",
    F.coalesce(
        F.to_date(F.col("order_date"), "d/M/yyyy"),  # Handles "11/5/2022"
        F.to_date(F.col("order_date"), "dd-MM-yyyy") # Handles "20-06-2022"
    ))
# Drop the old column and rename the new one
df = df.drop("order_date").withColumnRenamed("order_date_p1", "order_date")



df = df.withColumn("ship_date_p1",
    F.coalesce(
        F.to_date(F.col("ship_date"), "d/M/yyyy"),  # Handles "11/5/2022"
        F.to_date(F.col("ship_date"), "dd-MM-yyyy") # Handles "20-06-2022"
    ))
# Drop the old column and rename the new one
df = df.drop("ship_date").withColumnRenamed("ship_date_p1", "ship_date")

In [83]:
order = df.select(
    "order_id",
    "order_date",
    "delivery_status",
    "shipping_type",
    "ship_date",
    "days_for_shipment_scheduled",
    "days_for_shipment_real"
)

order.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- order_date: date (nullable = true)
 |-- delivery_status: string (nullable = true)
 |-- shipping_type: string (nullable = true)
 |-- ship_date: date (nullable = true)
 |-- days_for_shipment_scheduled: integer (nullable = true)
 |-- days_for_shipment_real: integer (nullable = true)



In [84]:
uuid_udf = F.udf(lambda: str(uuid.uuid4()), StringType())

## Extracting unique Customers infomation

In [85]:
customer_df = df.select(
    "customer_id",
    "customer_first_name",
    "customer_last_name",
    "customer_segment",
    "customer_city",
    "customer_state",
    "customer_country",
    "customer_region"
).dropDuplicates(["customer_id"])
customer_df.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- customer_first_name: string (nullable = true)
 |-- customer_last_name: string (nullable = true)
 |-- customer_segment: string (nullable = true)
 |-- customer_city: string (nullable = true)
 |-- customer_state: string (nullable = true)
 |-- customer_country: string (nullable = true)
 |-- customer_region: string (nullable = true)



In [95]:
customer_df.show(10)

+-----------+-------------------+------------------+----------------+-------------+--------------+----------------+---------------+
|customer_id|customer_first_name|customer_last_name|customer_segment|customer_city|customer_state|customer_country|customer_region|
+-----------+-------------------+------------------+----------------+-------------+--------------+----------------+---------------+
| C_ID_25006|             Judith|              Derr|        Consumer|    Oceanside|      New York|   United States|           East|
| C_ID_25007|            Douglas|            Rawles|        Consumer|  Los Angeles|    California|   United States|           West|
| C_ID_25011|            Destiny|           Badders|       Corporate|  New Bedford| Massachusetts|   United States|           East|
| C_ID_25021|            Rebecca|         Stevenson|       Corporate|Oklahoma City|      Oklahoma|   United States|        Central|
| C_ID_25022|             Daniel|               Epp|        Consumer|      S

### Extracting unique category info and assigning id to the df

In [87]:
catergory_df = df.select(
    "category_name"
).distinct().withColumn("category_id", F.monotonically_increasing_id()+1)
catergory_df.printSchema()

root
 |-- category_name: string (nullable = true)
 |-- category_id: long (nullable = false)



In [88]:
catergory_df.show()

+---------------+-----------+
|  category_name|category_id|
+---------------+-----------+
|Office Supplies|          1|
|      Furniture|          2|
|     Technology|          3|
+---------------+-----------+



### Extracting unique Product info and assigning uuid to the df

In [89]:
# Join category_id with product data
product_df = df.select("product_name", "category_name").dropDuplicates(["product_name"])
product_df = product_df.join(
    catergory_df, "category_name", "left"
).select("product_name", "category_id")

# assigning uuid to the df
product_df = product_df.withColumn("product_id", uuid_udf())
product_df.printSchema()


root
 |-- product_name: string (nullable = true)
 |-- category_id: long (nullable = true)
 |-- product_id: string (nullable = true)



In [96]:
product_df.show(10)

+--------------------+-----------+--------------------+
|        product_name|category_id|          product_id|
+--------------------+-----------+--------------------+
|"""While you Were...|          1|cfe8609e-a4de-41b...|
|"#10- 4 1/8"" x 9...|          1|39e7b3d2-49c5-431...|
|"#10- 4 1/8"" x 9...|          1|dc3d5119-84a0-46e...|
|"#10-4 1/8"" x 9 ...|          1|f502a7f7-49e6-405...|
|"1.7 Cubic Foot C...|          1|40ee2c5d-920f-4ff...|
|"1/4 Fold Party D...|          1|7612f033-f31e-402...|
|"6"" Cubicle Wall...|          2|779c92cf-92f3-426...|
|"Acco Banker's Cl...|          1|7d9645b4-3660-41e...|
|"Acco Data Flex C...|          1|feae9e2e-1105-4d7...|
|"Acco Flexible AC...|          1|1e77664a-e617-487...|
+--------------------+-----------+--------------------+
only showing top 10 rows



### Extracting unique orders data

In [101]:
order_df = df.select(
    "order_id",
    "customer_id",
    "order_date",
    "delivery_status",
    "shipping_type",
    "ship_date",
    "days_for_shipment_scheduled",
    "days_for_shipment_real",
    "sales_per_order",
    "profit_per_order"
)
order_df.count()

113270

In [106]:
order_df.groupBy("order_id").count().orderBy("count").show()

+------------+-----+
|    order_id|count|
+------------+-----+
|O_ID_3000454|    1|
|O_ID_3000654|    1|
|O_ID_3056551|    1|
|O_ID_3059003|    1|
|O_ID_3001261|    1|
|O_ID_3001339|    1|
|O_ID_3001644|    1|
|O_ID_3001987|    1|
|O_ID_3032246|    1|
|O_ID_3002256|    1|
|O_ID_3002330|    1|
|O_ID_3002460|    1|
|O_ID_3002463|    1|
|O_ID_3002825|    1|
|O_ID_3002994|    1|
|O_ID_3003047|    1|
|O_ID_3003080|    1|
|O_ID_3003156|    1|
|O_ID_3003317|    1|
|O_ID_3003536|    1|
+------------+-----+
only showing top 20 rows



In [100]:
order_df.show(10)

+----------+-----------+----------+---------------+-------------+---------+---------------------------+----------------------+---------------+----------------+
|  order_id|customer_id|order_date|delivery_status|shipping_type|ship_date|days_for_shipment_scheduled|days_for_shipment_real|sales_per_order|profit_per_order|
+----------+-----------+----------+---------------+-------------+---------+---------------------------+----------------------+---------------+----------------+
|  1/1/2022| C_ID_58305|      NULL|           East|     4/4/2022|     NULL|                       NULL|                     4|    59.99000168|             5.0|
| 1/11/2021| C_ID_74446|      NULL|           East|    3/11/2021|     NULL|                       NULL|                     4|           32.0|             1.0|
|  1/2/2021| C_ID_67492|      NULL|           East|     3/2/2021|     NULL|                       NULL|                     4|    31.98999977|             5.0|
|  1/3/2021| C_ID_40112|      NULL|     

In [9]:
db_url = "jdbc:postgresql://postgres:5432/airflow"
db_properties = {"user": "airflow", "password": "airflow", "driver": "org.postgresql.Driver"}
df_new.write.format("jdbc") \
    .option("url", db_url) \
    .option("dbtable", "public.ecommerce") \
    .option("user", db_properties["user"]) \
    .option("password", db_properties["password"]) \
    .option("driver", db_properties["driver"]) \
    .mode("append") \
    .save()