# Notebook Usage

<ul>
<li>Create an intact weekly level timeseries of the prices of products in /data exels
<li>This data will be visualized in the frontend react app
</ul>

#### Processing logic

1. Load all excels into a single dataframe
2. Create time periods (weeks, months) from start of data to end of data
3. For each product, (EAN-koodi) generate the time series
* Max price during time period
4. Make test visualization before frontend functionality

* Consider product grouping, could be useful e.g. if product has only slightly changed


In [1]:
import os
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *


os.environ["PYARROW_IGNORE_TIMEZONE"] = "1"


In [2]:
# Load in the excels using pandas

file_location = "..\data"

pdf_raw = pd.DataFrame()

for file_name in os.listdir(file_location):
    if file_name[-4:] == 'xlsx':
        print(file_name)
        df_one_file = pd.read_excel(f"{file_location}\{file_name}", skiprows=8)
        df_one_file["source"] = file_name
        pdf_raw = pd.concat([pdf_raw, df_one_file])
    

ostotietoraportti_aluoto.xlsx
ostotietoraportti_jluoto.xlsx


In [3]:
# Create spark session
# spark = (SparkSession.builder
#          .appName("price_analysis")
#          .config("spark.sql.shuffle.partitions", "1")  # Reduce shuffle partitions for operations like join, groupBy
#          .config("spark.default.parallelism", "4")  # Adjust parallelism; set to a reasonable number for your machine
#          .getOrCreate())

from delta import *
import pyspark

builder = pyspark.sql.SparkSession.builder.appName("MyApp") \
    .config("spark.hadoop.hadoop.home.dir", "C:/hadoop") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")# \
    # .config("spark.default.parallelism", "4") \
    # .config("spark.sql.shuffle.partitions", "1")


spark = configure_spark_with_delta_pip(builder).getOrCreate()

In [4]:
# Create spark dataframe
raw_data = spark.createDataFrame(pdf_raw)
raw_data.show(5)

+-------------------+----------+---------------+--------------------+--------------------+-------------+-------------------------+----------------------------+-------------+------------+-------+--------------------+
|         PÄIVÄMÄÄRÄ|KELLONAIKA|    TOIMIPAIKKA|          TUOTERYHMÄ|           TUOTENIMI|    EAN-KOODI|Yksikköhinta € verollinen|Ostetut tuotteet (paino/kpl)|Myyntihinta €|Alennukset €|OSTOT €|              source|
+-------------------+----------+---------------+--------------------+--------------------+-------------+-------------------------+----------------------------+-------------+------------+-------+--------------------+
|2021-06-30 00:00:00|     15:08|PRISMA HYVINKÄÄ|Mehu- ja juoma-an...|10 x Rainbow päär...|6415712500894|                     2.15|                         1.0|         2.15|         0.0|   2.15|ostotietoraportti...|
|2021-06-30 00:00:00|     15:08|PRISMA HYVINKÄÄ|Mehu- ja juoma-an...|10 x Rainbow vade...|6415712500887|                     2.15|      

In [5]:
# Create the timestamps as the baseline for price time series

mindate = raw_data.agg(min('PÄIVÄMÄÄRÄ')).first()[0]
maxdate = raw_data.agg(max('PÄIVÄMÄÄRÄ')).first()[0]

date_range = pd.date_range(start=mindate, end=maxdate)

# Spark expect a list of iterables = list of lists or list of tuples
date_list = [(date.to_pydatetime(),) for date in date_range]

# Convert pandas date range to Spark DataFrame
schema = StructType([
    StructField("Date", TimestampType(), False)
     ])

dates_df = spark.createDataFrame(date_list, schema=schema)

dates_df.show(3)

+-------------------+
|               Date|
+-------------------+
|2019-01-01 00:00:00|
|2019-01-02 00:00:00|
|2019-01-03 00:00:00|
+-------------------+


In [6]:
def price_history_for_product(product_transactions:DataFrame, dates_df:DataFrame) -> DataFrame:
    """
    Input: Spark dataframe
    Output: Spark dataframe

    Fills the gaps in the transactions dataframe
    """
    # Get max price for each day (if multiple rows for one day)
    product_daily_prices = product_transactions.groupby('PÄIVÄMÄÄRÄ').agg(max('Yksikköhinta € verollinen').alias('Yksikköhinta € verollinen'))
    product_daily_prices = product_daily_prices.withColumn("EAN-KOODI", lit(ean_code))
    # product_daily_prices.show(3)

    # left join product daily price with the date range
    product_daily_prices_full = dates_df.join(product_daily_prices, product_daily_prices.PÄIVÄMÄÄRÄ==dates_df.Date, how='left')
    # product_daily_prices_full.show(10)

    # Transform spark dataframe to pandas-on-spark -dataframe to allow ffill -logic
    ps_product_daily_prices_full = product_daily_prices_full.pandas_api()
    # display(ps_product_daily_prices_full.head(10))
    ps_product_daily_prices_full = ps_product_daily_prices_full.sort_values(by='Date')
    ps_product_daily_prices_full = ps_product_daily_prices_full.ffill()
    # display(ps_product_daily_prices_full.tail(5))

    # Transform pandas-on-spark -dataframe back to spark dataframe
    product_daily_prices_full_final = ps_product_daily_prices_full.to_spark()

    return product_daily_prices_full_final


In [7]:
# Get all products

all_products = raw_data.select('EAN-KOODI').distinct()
print(f"Product count: {all_products.count()}")


Product count: 2962


In [8]:
all_ean_codes = [row['EAN-KOODI'] for row in all_products.collect()]

In [9]:
# Get max price for each day for a product

price_history_all_products = None
ean_codes = all_ean_codes[:2]

for ean_code in ean_codes:
    # Filter with current product
    product_transactions = raw_data.filter(col('EAN-KOODI') == ean_code)
    # Get price history for current product
    one_product_history = price_history_for_product(product_transactions, dates_df)

    # Append data into the price_history_all_products Spark DataFrame
    if price_history_all_products is None:
        price_history_all_products = one_product_history
    else:
        price_history_all_products = price_history_all_products.union(one_product_history)





In [10]:

def format_column_name(col_name):
    # Split the column name by spaces
    parts = col_name.split()
    # Capitalize the first letter of each word except the first one
    new_parts = [parts[0].lower()] + [word.capitalize() for word in parts[1:]]
    # Join parts without spaces
    new_col_name = ''.join(new_parts)
    return new_col_name

# Apply the function to each column name and rename the columns
for col_name in price_history_all_products.columns:
    new_col_name = format_column_name(col_name)
    price_history_all_products = price_history_all_products.withColumnRenamed(col_name, new_col_name)


In [11]:
# price_history_all_products.write.format("delta").mode("overwrite").save("..\data\price_history_all_products_delta")

# Coalesce the DataFrame to ensure only one part-file is created
price_history_all_products.coalesce(1) \
    .write.format('csv') \
    .mode('overwrite') \
    .option('header', 'true') \
    .save('test.csv')

Py4JJavaError: An error occurred while calling o409.save.
: java.lang.UnsatisfiedLinkError: 'boolean org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(java.lang.String, int)'
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method)
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:793)
	at org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:1249)
	at org.apache.hadoop.fs.FileUtil.list(FileUtil.java:1454)
	at org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:601)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
	at org.apache.hadoop.fs.ChecksumFileSystem.listStatus(ChecksumFileSystem.java:761)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.getAllCommittedTaskPaths(FileOutputCommitter.java:334)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJobInternal(FileOutputCommitter.java:404)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:377)
	at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:192)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$writeAndCommit$3(FileFormatWriter.scala:275)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:552)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.writeAndCommit(FileFormatWriter.scala:275)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeWrite(FileFormatWriter.scala:304)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:190)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:190)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$executeCollect$1(AdaptiveSparkPlanExec.scala:390)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:418)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:390)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:859)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:388)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:361)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:240)
	at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:1570)


In [ ]:
# Coalesce the DataFrame to ensure only one part-file is created
# price_history_all_products.coalesce(1).write.format('csv').option('header', 'true').save('test.csv')


import matplotlib.pyplot as plt

fig, ax = plt.subplots()
ax.plot(ps_product_daily_prices_full['Date'], ps_product_daily_prices_full['Yksikköhinta € verollinen'])
    