In [1]:
from pyspark.sql import SparkSession

# Create a Spark session
spark = SparkSession.builder \
    .appName("dataintegration") \
    .getOrCreate()

25/04/07 02:40:39 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [2]:
hdfs_path='/data/olist'

In [3]:
customers_df = spark.read.csv(hdfs_path + "/olist_customers_dataset.csv", header=True, inferSchema=True)
geolocation_df = spark.read.csv(hdfs_path + "/olist_geolocation_dataset.csv", header=True, inferSchema=True)
order_items_df = spark.read.csv(hdfs_path + "/olist_order_items_dataset.csv", header=True, inferSchema=True)
payments_df = spark.read.csv(hdfs_path + "/olist_order_payments_dataset.csv", header=True, inferSchema=True)
reviews_df = spark.read.csv(hdfs_path + "/olist_order_reviews_dataset.csv", header=True, inferSchema=True)
orders_df = spark.read.csv(hdfs_path + "/olist_orders_dataset.csv", header=True, inferSchema=True)
products_df = spark.read.csv(hdfs_path + "/olist_products_dataset.csv", header=True, inferSchema=True)
sellers_df = spark.read.csv(hdfs_path + "/olist_sellers_dataset.csv", header=True, inferSchema=True)
category_translation_df = spark.read.csv(hdfs_path + "/product_category_name_translation.csv", header=True, inferSchema=True)

                                                                                

In [4]:
spark

In [5]:
# Cache the DataFrames
order_items_df.cache()
customers_df.cache()
orders_df.cache()

DataFrame[order_id: string, customer_id: string, order_status: string, order_purchase_timestamp: timestamp, order_approved_at: timestamp, order_delivered_carrier_date: timestamp, order_delivered_customer_date: timestamp, order_estimated_delivery_date: timestamp]

In [6]:

order_items_joined_df = orders_df.join(order_items_df, 'order_id', 'inner')

orders_item_products_df = order_items_joined_df.join(products_df, 'product_id', 'inner')

orders_items_products_sellers_df = orders_item_products_df.join(sellers_df, 'seller_id', 'inner')

full_order_df = orders_items_products_sellers_df.join(customers_df, 'customer_id', 'inner')

# Fixing the geolocation join
full_order_df = full_order_df.join(
    geolocation_df,
    full_order_df.customer_zip_code_prefix == geolocation_df.geolocation_zip_code_prefix,
    'left'
)




In [7]:
full_order_df = full_order_df.join(reviews_df,'order_id','left')


In [8]:
full_order_df = full_order_df.join(payments_df, 'order_id', 'left')


In [9]:
full_order_df.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_purchase_timestamp: timestamp (nullable = true)
 |-- order_approved_at: timestamp (nullable = true)
 |-- order_delivered_carrier_date: timestamp (nullable = true)
 |-- order_delivered_customer_date: timestamp (nullable = true)
 |-- order_estimated_delivery_date: timestamp (nullable = true)
 |-- order_item_id: integer (nullable = true)
 |-- shipping_limit_date: timestamp (nullable = true)
 |-- price: double (nullable = true)
 |-- freight_value: double (nullable = true)
 |-- product_category_name: string (nullable = true)
 |-- product_name_lenght: integer (nullable = true)
 |-- product_description_lenght: integer (nullable = true)
 |-- product_photos_qty: integer (nullable = true)
 |-- product_weight_g: integer (nullable = true)
 |-- product_length_cm: integer (null

In [10]:
from pyspark.sql.functions import *

In [11]:
#total revenue per seller

seller_revenue_df = full_order_df.groupBy("seller_id").agg(sum("price"))


In [12]:
seller_revenue_df.show(5)

                                                                                

+--------------------+------------------+
|           seller_id|        sum(price)|
+--------------------+------------------+
|8e6cc767478edae94...|1145757.4000000872|
|4d600e08ecbe08258...| 436434.2300000007|
|62de60d81c55c29d7...|            8343.0|
|cb5ff1b9715e99589...|           13235.0|
|038b75b729c8a9a04...|           17979.5|
+--------------------+------------------+
only showing top 5 rows



In [15]:
customer_order_count_df = full_order_df.groupBy("customer_id").agg(count("order_id"))


In [16]:
customer_order_count_df.show(5)



+--------------------+---------------+
|         customer_id|count(order_id)|
+--------------------+---------------+
|8f4758b55faa41da5...|             46|
|bc12c0a740d705dbe...|             92|
|d9f57c5a009cd22a4...|            194|
|aa62a16b10e3fb24e...|            127|
|3a832bedadec4eeb6...|            221|
+--------------------+---------------+
only showing top 5 rows



                                                                                

In [17]:
most_sold_products_df = full_order_df.groupBy("product_id").count().orderBy("count", ascending=False)


In [18]:
most_sold_products_df.show(5)



+--------------------+-----+
|          product_id|count|
+--------------------+-----+
|aca2eb7d00ea1a7b8...|86740|
|422879e10f4668299...|81110|
|99a4788cb24856965...|78775|
|389d119b48cf3043d...|60248|
|d1c427060a0f73f6b...|59274|
+--------------------+-----+
only showing top 5 rows



                                                                                

In [19]:
customer_payment_df = full_order_df.groupBy("customer_id").agg(sum("payment_value")).orderBy("sum(payment_value)", ascending=True)

In [20]:
customer_payment_df.show(5)



+--------------------+------------------+
|         customer_id|sum(payment_value)|
+--------------------+------------------+
|86dc2ffce2dfff336...|              NULL|
|9d5ff5d7ce2e4bc38...|              20.7|
|6768f81d9f31aef88...|             25.51|
|cbb68c721ba9ddb30...|             26.78|
|03bbe0ce5c28e05f2...|             27.04|
+--------------------+------------------+
only showing top 5 rows



                                                                                

In [29]:
# advance aggregation and enrichments

In [21]:
from pyspark.sql.functions import count, sum, avg, round, desc

customer_spending_df = full_order_df.groupBy('customer_id') \
    .agg(
        count('order_id').alias('total_orders'),
        sum('price').alias('total_spent'),
        round(avg('price'), 2).alias('AOV')
    ) \
    .orderBy(desc('total_spent'))
customer_spending_df.show()



+--------------------+------------+------------------+-------+
|         customer_id|total_orders|       total_spent|    AOV|
+--------------------+------------+------------------+-------+
|d3e82ccec3cb5f956...|        6876|         6662844.0|  969.0|
|df55c14d1476a9a34...|         743|         3565657.0| 4799.0|
|fe5113a38e3575c04...|        2292|         3293604.0| 1437.0|
|ec5b2ba62e5743423...|        1428|         2556120.0| 1790.0|
|63b964e79dee32a35...|        6072|         2501664.0|  412.0|
|46bb3c0b1a65c8399...|         748|         2336752.0| 3124.0|
|05455dfa7cd02f13d...|        2184| 2160194.400000087|  989.1|
|3690e975641f01bd0...|         802|         2124498.0| 2649.0|
|349509b216bd5ec11...|         743|         1923627.0| 2589.0|
|695476b5848d64ba0...|         687|1820543.1299999943|2649.99|
|73236a0796f53d60d...|         832|         1755520.0| 2110.0|
|cc803a2c412833101...|         762|         1676400.0| 2200.0|
|1ff773612ab8934db...|        5820|1658641.7999999512| 

                                                                                

In [22]:
# Drop all duplicate review_score columns except one
full_order_df = full_order_df.drop("review_score")

In [23]:
# Assuming you have a separate reviews_df that includes review_score
full_order_df = full_order_df.join(reviews_df.select("review_id", "review_score"), on="review_id", how="left")


In [24]:
# seller performance metrics
from pyspark.sql.functions import count, sum, avg, stddev, round, desc

seller_performance_df = full_order_df.groupBy("seller_id") \
    .agg(
        count("order_id").alias("total_orders"),
        sum("price").alias("total_revenue"),
        round(avg("review_score"), 2).alias("avg_review_score"),
        round(stddev("price"), 2).alias("price_variability")
    ) \
    .orderBy(desc("total_revenue"))
seller_performance_df.show(5)



+--------------------+------------+--------------------+----------------+-----------------+
|           seller_id|total_orders|       total_revenue|avg_review_score|price_variability|
+--------------------+------------+--------------------+----------------+-----------------+
|4869f7a5dfa277a7d...|      185115|3.6197078119993314E7|             4.1|           111.59|
|4a3ca9315b744ce9f...|      338174|3.4583210840008125E7|            3.76|            59.51|
|53243585a1d6dc264...|       54608|3.4307572950000696E7|            4.11|           499.58|
|7c67e1448b00f6e96...|      236498| 3.275090671001961E7|            3.42|            50.45|
|da8622b14eb17ae28...|      273235| 3.101857953003215E7|             4.0|             73.0|
+--------------------+------------+--------------------+----------------+-----------------+
only showing top 5 rows



                                                                                

In [25]:
from pyspark.sql.functions import when, lit

# Add two new columns based on order_status
full_order_df = full_order_df.withColumn(
    "is_delivered", when(col("order_status") == "delivered", lit(1)).otherwise(lit(0))
).withColumn(
    "is_cancelled", when(col("order_status") == "canceled", lit(1)).otherwise(lit(0))
)


In [29]:
from pyspark.sql.functions import when, col

# Create binary flags: 1 if delivered/canceled, else 0
order_status_df = full_order_df.withColumn(
    "is_delivered", when(col("order_status") == "delivered", 1).otherwise(0)
).withColumn(
    "is_cancelled", when(col("order_status") == "canceled", 1).otherwise(0)
)

order_status_df.select("order_id", "order_status", "is_delivered", "is_cancelled").show(10)


[Stage 100:>                                                        (0 + 1) / 1]

+--------------------+------------+------------+------------+
|            order_id|order_status|is_delivered|is_cancelled|
+--------------------+------------+------------+------------+
|00010242fe8c5a6d1...|   delivered|           1|           0|
|00010242fe8c5a6d1...|   delivered|           1|           0|
|00010242fe8c5a6d1...|   delivered|           1|           0|
|00010242fe8c5a6d1...|   delivered|           1|           0|
|00010242fe8c5a6d1...|   delivered|           1|           0|
|00010242fe8c5a6d1...|   delivered|           1|           0|
|00010242fe8c5a6d1...|   delivered|           1|           0|
|00010242fe8c5a6d1...|   delivered|           1|           0|
|00010242fe8c5a6d1...|   delivered|           1|           0|
|00010242fe8c5a6d1...|   delivered|           1|           0|
+--------------------+------------+------------+------------+
only showing top 10 rows



                                                                                

In [30]:
order_status_df.select('order_status','is_delivered','is_cancelled').show(5)

[Stage 109:>                                                        (0 + 1) / 1]

+------------+------------+------------+
|order_status|is_delivered|is_cancelled|
+------------+------------+------------+
|   delivered|           1|           0|
|   delivered|           1|           0|
|   delivered|           1|           0|
|   delivered|           1|           0|
|   delivered|           1|           0|
+------------+------------+------------+
only showing top 5 rows



                                                                                

In [31]:
order_status_df.filter("is_cancelled == 1").select("order_status", "is_delivered", "is_cancelled").show()

[Stage 118:>                                                        (0 + 1) / 1]

+------------+------------+------------+
|order_status|is_delivered|is_cancelled|
+------------+------------+------------+
|    canceled|           0|           1|
|    canceled|           0|           1|
|    canceled|           0|           1|
|    canceled|           0|           1|
|    canceled|           0|           1|
|    canceled|           0|           1|
|    canceled|           0|           1|
|    canceled|           0|           1|
|    canceled|           0|           1|
|    canceled|           0|           1|
|    canceled|           0|           1|
|    canceled|           0|           1|
|    canceled|           0|           1|
|    canceled|           0|           1|
|    canceled|           0|           1|
|    canceled|           0|           1|
|    canceled|           0|           1|
|    canceled|           0|           1|
|    canceled|           0|           1|
|    canceled|           0|           1|
+------------+------------+------------+
only showing top

                                                                                

In [32]:
#order revenue 
full_order_df=full_order_df.withColumn('order_revenue',col('price')+col('freight_value'))

In [33]:
full_order_df.select("order_id", "order_revenue").show(5)

[Stage 127:>                                                        (0 + 1) / 1]

+--------------------+-------------+
|            order_id|order_revenue|
+--------------------+-------------+
|00010242fe8c5a6d1...|        72.19|
|00010242fe8c5a6d1...|        72.19|
|00010242fe8c5a6d1...|        72.19|
|00010242fe8c5a6d1...|        72.19|
|00010242fe8c5a6d1...|        72.19|
+--------------------+-------------+
only showing top 5 rows



                                                                                

In [34]:
from pyspark.sql.functions import when, col

customer_spending_df = customer_spending_df.withColumn(
    'customer_segment',
    when(col('AOV') >= 1200, 'High Spender')
    .when((col('AOV') < 1200) & (col('AOV') >= 500), 'Medium Spender')
    .otherwise('low-spender')
)


In [35]:
customer_spending_df.show(10)



+--------------------+------------+------------------+-------+----------------+
|         customer_id|total_orders|       total_spent|    AOV|customer_segment|
+--------------------+------------+------------------+-------+----------------+
|d3e82ccec3cb5f956...|        6876|         6662844.0|  969.0|  Medium Spender|
|df55c14d1476a9a34...|         743|         3565657.0| 4799.0|    High Spender|
|fe5113a38e3575c04...|        2292|         3293604.0| 1437.0|    High Spender|
|ec5b2ba62e5743423...|        1428|         2556120.0| 1790.0|    High Spender|
|63b964e79dee32a35...|        6072|         2501664.0|  412.0|     low-spender|
|46bb3c0b1a65c8399...|         748|         2336752.0| 3124.0|    High Spender|
|05455dfa7cd02f13d...|        2184| 2160194.400000087|  989.1|  Medium Spender|
|3690e975641f01bd0...|         802|         2124498.0| 2649.0|    High Spender|
|349509b216bd5ec11...|         743|         1923627.0| 2589.0|    High Spender|
|695476b5848d64ba0...|         687|18205

                                                                                

In [36]:
full_order_df = full_order_df.join(
    customer_spending_df.select('customer_id', 'customer_segment'),
    on='customer_id',
    how='left'
)


In [37]:
full_order_df.select('customer_id', 'customer_segment').show(10)

                                                                                

+--------------------+----------------+
|         customer_id|customer_segment|
+--------------------+----------------+
|c08ef557085ca9fb0...|     low-spender|
|c08ef557085ca9fb0...|     low-spender|
|c08ef557085ca9fb0...|     low-spender|
|c08ef557085ca9fb0...|     low-spender|
|c08ef557085ca9fb0...|     low-spender|
|c08ef557085ca9fb0...|     low-spender|
|c08ef557085ca9fb0...|     low-spender|
|c08ef557085ca9fb0...|     low-spender|
|c08ef557085ca9fb0...|     low-spender|
|c08ef557085ca9fb0...|     low-spender|
+--------------------+----------------+
only showing top 10 rows



In [38]:

#weekday vs weekend order
from pyspark.sql.functions import dayofweek, when

full_order_df = full_order_df.withColumn(
    "order_day_type",
    when(dayofweek("order_purchase_timestamp").isin([1, 7]), "Weekend")
    .otherwise("Weekday")
)


In [39]:
full_order_df.select('order_day_type', 'order_id').show(2)


[Stage 163:>                                                        (0 + 1) / 1]

+--------------+--------------------+
|order_day_type|            order_id|
+--------------+--------------------+
|       Weekday|00010242fe8c5a6d1...|
|       Weekday|00010242fe8c5a6d1...|
+--------------+--------------------+
only showing top 2 rows



                                                                                

In [40]:
full_order_df.filter(full_order_df.order_day_type == "Weekend").select("order_id").show()

                                                                                

+--------------------+
|            order_id|
+--------------------+
|000229ec398224ef6...|
|000229ec398224ef6...|
|000229ec398224ef6...|
|000229ec398224ef6...|
|000229ec398224ef6...|
|000229ec398224ef6...|
|000229ec398224ef6...|
|000229ec398224ef6...|
|000229ec398224ef6...|
|000229ec398224ef6...|
|000229ec398224ef6...|
|000229ec398224ef6...|
|000229ec398224ef6...|
|000229ec398224ef6...|
|000229ec398224ef6...|
|000229ec398224ef6...|
|000229ec398224ef6...|
|000229ec398224ef6...|
|000229ec398224ef6...|
|000229ec398224ef6...|
+--------------------+
only showing top 20 rows



                                                                                

In [41]:
!hadoop fs -ls /data/olist/

Found 10 items
-rw-r--r--   2 abhishekpandey68461 hadoop    9033957 2025-03-30 13:16 /data/olist/olist_customers_dataset.csv
-rw-r--r--   2 abhishekpandey68461 hadoop   61273883 2025-03-30 13:16 /data/olist/olist_geolocation_dataset.csv
-rw-r--r--   2 abhishekpandey68461 hadoop   15438671 2025-03-30 13:16 /data/olist/olist_order_items_dataset.csv
-rw-r--r--   2 abhishekpandey68461 hadoop    5777138 2025-03-30 13:16 /data/olist/olist_order_payments_dataset.csv
-rw-r--r--   2 abhishekpandey68461 hadoop   14451670 2025-03-30 13:16 /data/olist/olist_order_reviews_dataset.csv
-rw-r--r--   2 abhishekpandey68461 hadoop   17654914 2025-03-30 13:16 /data/olist/olist_orders_dataset.csv
-rw-r--r--   2 abhishekpandey68461 hadoop    2379446 2025-03-30 13:16 /data/olist/olist_products_dataset.csv
-rw-r--r--   2 abhishekpandey68461 hadoop     174703 2025-03-30 13:16 /data/olist/olist_sellers_dataset.csv
drwxrwxr-x   - root                hadoop          0 2025-04-07 02:15 /data/olist/processed
-rw-r-

In [43]:
!hadoop fs -mkdir -p /data/olist/processed

In [3]:
!hadoop fs -ls -h /data/olist/processed

Found 9 items
-rw-r--r--   2 root hadoop          0 2025-04-07 02:51 /data/olist/processed/_SUCCESS
-rw-r--r--   2 root hadoop     27.5 M 2025-04-07 02:50 /data/olist/processed/part-00000-0f178ae0-9c34-4858-96e2-921a6836fffd-c000.snappy.parquet
-rw-r--r--   2 root hadoop     27.4 M 2025-04-07 02:50 /data/olist/processed/part-00001-0f178ae0-9c34-4858-96e2-921a6836fffd-c000.snappy.parquet
-rw-r--r--   2 root hadoop     27.9 M 2025-04-07 02:50 /data/olist/processed/part-00002-0f178ae0-9c34-4858-96e2-921a6836fffd-c000.snappy.parquet
-rw-r--r--   2 root hadoop     27.2 M 2025-04-07 02:50 /data/olist/processed/part-00003-0f178ae0-9c34-4858-96e2-921a6836fffd-c000.snappy.parquet
-rw-r--r--   2 root hadoop     27.5 M 2025-04-07 02:50 /data/olist/processed/part-00004-0f178ae0-9c34-4858-96e2-921a6836fffd-c000.snappy.parquet
-rw-r--r--   2 root hadoop     27.6 M 2025-04-07 02:50 /data/olist/processed/part-00005-0f178ae0-9c34-4858-96e2-921a6836fffd-c000.snappy.parquet
-rw-r--r--   2 root hadoop    

In [None]:
!hdfs dfs -chmod 775 /data/olist/processed


In [2]:
full_order_df.write.mode("overwrite").parquet("hdfs:///data/olist/processed")

NameError: name 'full_order_df' is not defined

In [1]:
spark.stop()