In [1]:
# Test to check the version of spark and if it is instanciated
spark

23/08/07 21:00:45 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties


### Examples to get you up to speed

Create a database

In [4]:
spark.sql('''create database if not exists dal.test''')

DataFrame[]

In [5]:
spark.sql('''show databases''').show()

+---------+
|namespace|
+---------+
|  default|
|     test|
+---------+



Create a table

In [6]:
# creating test table 
spark.sql('''
create table dal.test.test 
    (
    customer_id varchar(64),
    company_name varchar(64),
    industries varchar(64)
 ) 
 USING iceberg
''')

DataFrame[]

In [7]:
spark.sql('''select * from dal.test.test''').show()

+-----------+------------+----------+
|customer_id|company_name|industries|
+-----------+------------+----------+
+-----------+------------+----------+



In [8]:
spark.sql("insert into dal.test.test values('1','2','3')")

                                                                                

DataFrame[]

In [10]:
df=spark.sql("select * from dal.test.test").show()

+-----------+------------+----------+
|customer_id|company_name|industries|
+-----------+------------+----------+
|          1|           2|         3|
+-----------+------------+----------+



[Stage 1:>                                                          (0 + 1) / 1]                                                                                

In [11]:
spark.sql('''create database if not exists dal.assignment''')

DataFrame[]

In [21]:
minio_access_key = "mini0"
minio_secret_key = "minio123"

spark.conf.set("spark.hadoop.fs.s3a.access.key", minio_access_key)
spark.conf.set("spark.hadoop.fs.s3a.secret.key", minio_secret_key)

minio_endpoint = "http://minio:9000" 
minio_bucket = "demo-data"
hist_data = "historical_orders.json"
recent_data = "recent_orders.json"
cust_data = "Customers.csv"


# Read data from MinIO

hist_data_path = f"s3a://{minio_bucket}/{hist_data}"
df_hist = spark.read.format("json").load(hist_data_path)

recent_data_path = f"s3a://{minio_bucket}/{recent_data}"
df_recent = spark.read.format("json").load(recent_data_path)

cust_data_path = f"s3a://{minio_bucket}/{cust_data}"
df_cust = spark.read.format("csv")\
    .option("header","true")\
    .option("inferSchema","true")\
    .load(cust_data_path)

df_hist.write.mode("overwrite").saveAsTable("assignment.historical_orders")
df_recent.write.mode("overwrite").saveAsTable("assignment.recent_orders")
df_cust.write.mode("overwrite").saveAsTable("assignment.customers")

#df_hist.show(5)
#df_recent.show(5)
#df_cust.show(5)



                                                                                

In [None]:
# Step 4: Explode the order_lines array to get individual rows for each product
#order_lines_df = combined_df.withColumn("order_line", explode(col("order_lines"))).drop("order_lines")


# Step 6: Explode the specialized_industries array to get individual rows for each industry
#customer_industries_df = customer_df.withColumn("specialized_industry", explode(split(col("specialized_industries"), ";"))).drop("specialized_industries")


In [65]:
#from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode, sum, split, expr
from pyspark.sql.window import Window

# Step 1: Create a SparkSession
#spark = SparkSession.builder.appName("IndustryFluctuation").getOrCreate()

historical_df = spark.sql("select * from dal.assignment.historical_orders")


In [66]:
recent_df = spark.sql("select * from dal.assignment.recent_orders")

# Step 3: Combine historical and recent order data into a single DataFrame
combined_orig_df = historical_df.union(recent_df)
combined_df = combined_orig_df.dropDuplicates(["order_id"])

# Step 5: Load customer data from customer.csv and de-duplicate it based on customer_id
customer_orig_df = spark.sql("select * from dal.assignment.customers")
customer_df = customer_orig_df.dropDuplicates(["customer_id"])


In [52]:
from pyspark.sql.functions import max, date_sub
from datetime import datetime,timedelta

combined_date_cast_df = combined_df.withColumn("timestamp", col("timestamp").cast("timestamp"))

# Find the maximum timestamp date
max_date = combined_date_cast_df.select(max(col("timestamp"))).first()[0]
one_day_before = max_date - timedelta(days=1)
month_before = max_date - timedelta(days=30)

print(max_date)
print(one_day_before)
print(month_before)




2023-03-20 23:58:19.767981
2023-03-19 23:58:19.767981
2023-02-18 23:58:19.767981


In [41]:

# Step 6: Join customer data with order data and de-duplicate it based on order_id
order_with_customer_df = combined_date_cast_df.join(customer_df, on="customer_id")

# Step 7: Explode the specialized_industries array to get individual rows for each industry
customer_industries_df = order_with_customer_df.withColumn("specialized_industry", explode(split(col("specialized_industries"), ";"))).drop("specialized_industries")


In [43]:
customer_industries_df.show(5)

[Stage 62:>                                                         (0 + 1) / 1]

+--------------------+------------------+--------------------+--------------------+--------------------+------------+--------------------+
|         customer_id|            amount|            order_id|         order_lines|           timestamp|company_name|specialized_industry|
+--------------------+------------------+--------------------+--------------------+--------------------+------------+--------------------+
|06ec6a8a-c3d9-4b0...|         244413.25|00007ef8-0255-47e...|[{97.71, 4b688bad...| 2022-03-13 19:48:00|     Skilith|             Colours|
|54920ba7-09fa-427...|          81861.76|000670bb-8405-456...|[{90.77, 1a4d50ec...|2022-03-07 02:04:...|    Innotype|             Polymer|
|0674efa6-ea2c-431...|         126424.82|0007216d-2e34-4df...|[{49.23, 7f96c322...|2022-04-29 21:26:...|     Zoombox|        Construction|
|b9b1b6d9-4e0e-457...|29282.679999999997|00097d87-39f7-4a8...|[{6.15, 763bcbb2-...|2022-11-08 06:40:...|     Wordify|           Cosmetics|
|b9b1b6d9-4e0e-457...|29282

                                                                                

In [26]:
from pyspark.sql.functions import avg


# Calculate the average amount per order_id
average_amount_by_order = customer_industries_df.groupBy("order_id").agg(avg("amount").alias("average_amount"))

# Show the result
average_amount_by_order.show(5)





+--------------------+------------------+
|            order_id|    average_amount|
+--------------------+------------------+
|0152349b-ff99-47e...| 81030.42000000001|
|01ca462b-3313-4aa...|          88546.96|
|01da1165-ec2d-48e...| 94298.34000000001|
|031b4b25-f9de-475...|240275.78999999992|
|0370b16a-9dab-4a5...|129972.78000000003|
+--------------------+------------------+
only showing top 5 rows



                                                                                

In [53]:
# Step 8: Calculate Industry-Wise Daily Revenue for the last 24 hours
last_24_hours_df = customer_industries_df.filter(col("timestamp") >= one_day_before)
last_30_days_df = customer_industries_df.filter(col("timestamp") >= month_before)

In [54]:
print(customer_industries_df.count())
print(last_24_hours_df.count())
print(last_30_days_df.count())

57261
889
1446




In [61]:
industry_daily_revenue_df = last_24_hours_df.groupBy("specialized_industry").agg(sum(col("amount")).alias("daily_revenue"))
industry_monthly_revenue_df = last_30_days_df.groupBy("specialized_industry").agg(avg(col("amount")).alias("monthly_avg_revenue"))

In [56]:
industry_daily_revenue_df.show()
industry_monthly_revenue_df.show()

                                                                                

+--------------------+--------------------+
|specialized_industry|       daily_revenue|
+--------------------+--------------------+
|                Food|1.1371096509999996E7|
|             Colours|           5773935.9|
|             Polymer|1.4475883969999995E7|
|             Oil&Gas|   5559699.649999999|
|        Construction|1.5049691109999998E7|
|         Agriculture|1.1477563790000005E7|
|           Cosmetics|1.6000598300000012E7|
|            Cleaning|   6498867.699999999|
|          Lubricants|   7030005.009999999|
|     Pharmaceuticals| 1.543688640000001E7|
|             Ploymer|  2253577.7300000004|
+--------------------+--------------------+

+--------------------+--------------------+
|specialized_industry|     monthly_revenue|
+--------------------+--------------------+
|                Food|       1.944055448E7|
|             Colours|1.1263576360000003E7|
|             Polymer| 2.347341588000001E7|
|             Oil&Gas|       1.072537522E7|
|        Construction|2.6396147

In [62]:
industry_monthly_revenue_df.show()

+--------------------+-------------------+
|specialized_industry|monthly_avg_revenue|
+--------------------+-------------------+
|                Food| 131355.09783783785|
|             Colours| 122430.17782608699|
|             Polymer| 124197.96761904767|
|             Oil&Gas| 130797.25878048781|
|        Construction| 126904.55331730771|
|         Agriculture| 131525.34911111108|
|           Cosmetics|  129814.2287755103|
|            Cleaning| 129873.53585106388|
|          Lubricants| 135455.80265060242|
|     Pharmaceuticals| 129179.87967567577|
|             Ploymer|  131530.5464705883|
+--------------------+-------------------+



In [64]:
industry_fluctuation_df = industry_daily_revenue_df.join(industry_monthly_revenue_df, on="specialized_industry")
industry_fluctuation_cal_df = industry_fluctuation_df.withColumn("fluctuation", col("daily_revenue") - col("monthly_avg_revenue"))

# Step 11: Identify Top 3 Industries with the Highest Fluctuations (Positive or Negative)
top_3_industries_df = industry_fluctuation_cal_df.orderBy(col("fluctuation").desc()).limit(3)

# Step 12: Show the results
top_3_industries_df.show()




+--------------------+--------------------+-------------------+--------------------+
|specialized_industry|       daily_revenue|monthly_avg_revenue|         fluctuation|
+--------------------+--------------------+-------------------+--------------------+
|           Cosmetics|1.6000598300000012E7|  129814.2287755103|1.5870784071224501E7|
|     Pharmaceuticals| 1.543688640000001E7| 129179.87967567577|1.5307706520324335E7|
|        Construction|1.5049691109999998E7| 126904.55331730771| 1.492278655668269E7|
+--------------------+--------------------+-------------------+--------------------+



                                                                                

In [25]:
from pyspark.sql.window import Window
from pyspark.sql.functions import col, expr
from datetime import datetime, timedelta
from pyspark.sql.functions import udf
from pyspark.sql.types import LongType

end_date = datetime.now()
#start_date_30_days = end_date - timedelta(days=30)
#start_date_1_day = end_date - timedelta(days=1)

# Define the time intervals
interval_30_days = timedelta(days=30)
interval_1_day = timedelta(days=1)

def calculate_boundary(interval):
    return -interval.total_seconds()

# Register the UDF
calculate_boundary_udf = udf(calculate_boundary, LongType())



# Step 9: Calculate Industry-Wise Average Revenue for the past 30 days
#window_spec = Window.partitionBy("specialized_industry").orderBy("timestamp").rangeBetween(expr("-interval 30 days"), expr("-interval 1 day"))\
#window_spec = Window.partitionBy("specialized_industry").orderBy("timestamp") \
#                    .rangeBetween(-int((end_date - start_date_30_days).total_seconds()), -1)
#window_spec = Window.partitionBy("specialized_industry").orderBy("timestamp") \
#                    .rangeBetween(-interval_30_days, -interval_1_day)
#window_spec = Window.partitionBy("specialized_industry").orderBy("timestamp") \
#                    .rangeBetween(calculate_boundary_udf(interval_30_days), calculate_boundary_udf(interval_1_day))\

window_spec = Window.partitionBy("specialized_industry").orderBy("timestamp") \
                    .rangeBetween(calculate_boundary_udf(interval_30_days), calculate_boundary_udf(interval_1_day))

industry_avg_revenue_df = order_with_customer_df.withColumn("avg_revenue_last_30_days", sum(col("order_line.volume") * col("order_line.price")).over(window_spec))

# Step 10: Calculate Fluctuation for each Industry
#industry_fluctuation_df = industry_daily_revenue_df.join(industry_avg_revenue_df, on="specialized_industry")
#industry_fluctuation_df = industry_fluctuation_df.withColumn("fluctuation", col("daily_revenue") - col("avg_revenue_last_30_days"))

# Step 11: Identify Top 3 Industries with the Highest Fluctuations (Positive or Negative)
#top_3_industries_df = industry_fluctuation_df.orderBy(col("fluctuation").desc()).limit(3)

# Step 12: Show the results
#top_3_industries_df.show()

# Step 13: Stop the SparkSession
#spark.stop()

TypeError: Invalid argument, not a string or column: 30 days, 0:00:00 of type <class 'datetime.timedelta'>. For column literals, use 'lit', 'array', 'struct' or 'create_map' function.