In [0]:
%spark.pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import count, avg, sum, countDistinct, desc, col,lit

spark = SparkSession.builder.appName("e-commerce").enableHiveSupport().getOrCreate()

In [1]:
%spark.pyspark
logistics_df = spark.read.csv("hdfs://namenode:9000/user/menna/logistics_info.csv",
                              header=True, inferSchema=True)


In [2]:
%spark.pyspark
from pyspark.sql import DataFrame

# Drop the column
logistics_df = logistics_df.drop(" actual_delivery_date")


In [3]:
%spark.pyspark
logistics_df.show(5)

In [4]:
%spark.pyspark
order_df = spark.read.csv("hdfs://namenode:9000/user/menna/order_info.csv",
                              header=True, inferSchema=True)
order_df.show(5)

In [5]:
%spark.pyspark
order_df = order_df.drop("_c6","_c7", "_c8", "_c9")


In [6]:
%spark.pyspark
order_df.show(5)


In [7]:
%spark.pyspark
from pyspark.ml.feature import StringIndexer

# Create StringIndexer for country to make each country has its own warehouse id
indexer = StringIndexer(inputCol="order_country", outputCol="warehouse_id")

indexer_model = indexer.fit(order_df)

order_df = indexer_model.transform(order_df)

order_df.select("order_id", "order_country", "warehouse_id").show(10, truncate=False)



In [8]:
%spark.pyspark
from pyspark.sql import functions as F
num_countries = order_df.select("order_country").distinct().count()
print(num_countries)


In [9]:
%spark.pyspark
country_counts = order_df.groupBy("order_country").count()
country_counts.show()


In [10]:
%spark.pyspark
from pyspark.sql import functions as F

order_df = order_df.withColumn("expected_delivery_date", F.date_add(F.col("order_date"), 5))
order_df.show(10)


In [11]:
%spark.pyspark

logistics_df = logistics_df.join(
    order_df.select("order_id", "expected_delivery_date"),
    on="order_id",
    how="left"
)

logistics_df = logistics_df.withColumn(
    "actual_delivery_date",
    F.when(F.rand() < 0.33, F.date_sub(F.col("expected_delivery_date"), 1))  # early
     .when(F.rand() < 0.66, F.col("expected_delivery_date"))                # on time
     .otherwise(F.date_add(F.col("expected_delivery_date"), 2))             # late
)
logistics_df = logistics_df.drop("expected_delivery_date")
logistics_df.show()

In [12]:
%spark.pyspark
logistics_df.show(10)

In [13]:
%spark.pyspark

from pyspark.sql import functions as F

# Make sure warehouse_id and shipping_cost are included from logistics_df
joined_df = logistics_df.join(
    order_df.select("order_id", "expected_delivery_date","warehouse_id"),
    on="order_id",
    how="left"
)

# Calculate delivery delta and late delivery
joined_df = joined_df.withColumn(
    "delivery_time_delta_days",
    F.datediff(F.col("actual_delivery_date"), F.col("expected_delivery_date"))
).withColumn(
    "is_late_delivery",
    F.when(F.col("delivery_time_delta_days") > 0, True).otherwise(False)
)



In [14]:
%spark.pyspark
# Select the columns that exist in joined_df
Staging_Logistics_Fact = joined_df.select(
    "order_id",
    "delivery_time_delta_days",
    "is_late_delivery",
    "shipping_cost",
    "warehouse_id"
)

Staging_Logistics_Fact.show()

In [15]:
%spark.pyspark
from pyspark.sql.functions import round, col

# Round order_id and shipping_cost to 0 decimal places (nearest integer)
Staging_Logistics_Fact = Staging_Logistics_Fact.withColumn("order_id", round(col("order_id"), 0)) \
               .withColumn("shipping_cost", round(col("shipping_cost"), 0))


In [16]:
%spark.pyspark
Staging_Logistics_Fact.show()

In [17]:
%spark.pyspark
Staging_Logistics_Fact.groupBy("is_late_delivery").count().show()


In [18]:
%spark.pyspark
print(Staging_Logistics_Fact.columns)


In [19]:
%spark.pyspark
Staging_Logistics_Fact.createOrReplaceTempView("staging_logistics_fact")

avgDeliveryTimeDelta = spark.sql("""
    SELECT 
        warehouse_id,
        AVG(delivery_time_delta_days) AS avg_delivery_time_delta
    FROM staging_logistics_fact
    GROUP BY warehouse_id
""")
avgDeliveryTimeDelta.show()



In [20]:
%spark.pyspark
avgDeliveryTimeDelta.write.mode("overwrite").parquet("hdfs://namenode:9000/user/bigdata/avgDeliveryTimeDelta")


In [21]:
%spark.pyspark
from pyspark.sql.functions import col

Staging_Logistics_Fact_fixed = Staging_Logistics_Fact.select(
    col("order_id").cast("int"),
    col("delivery_time_delta_days").cast("int"),
    col("is_late_delivery").cast("boolean"),
    col("shipping_cost").cast("float"),
    col("warehouse_id").cast("int")
)

Staging_Logistics_Fact_fixed.write.mode("overwrite").parquet("hdfs://namenode:9000/user/bigdata/staging_logistics_fact")


In [22]:
%spark.pyspark
