# Task: Find price of the item of the last 2 concecutive mondays 

## Imports

In [1]:
import csv
from datetime import datetime
import pandas as pd

## Solution Python

### Read the data

In [18]:
file_path = "Data/products.txt"  # Replace with your file path

In [19]:
df = pd.read_csv(file_path)
df.head()

Unnamed: 0,product_id,timestamp,amount
0,1,2025-01-02,23.45
1,2,2025-01-03,34.56
2,3,2025-01-04,45.67
3,4,2025-01-05,56.78
4,1,2025-01-06,67.89


In [20]:
sorted_df = df.sort_values(by=['product_id', 'timestamp'])
sorted_df.head(5)

Unnamed: 0,product_id,timestamp,amount
0,1,2025-01-02,23.45
36,1,2025-01-03,67.89
4,1,2025-01-06,67.89
40,1,2025-01-07,23.45
8,1,2025-01-10,23.45


In [22]:
sorted_df.dtypes

product_id      int64
timestamp      object
amount        float64
dtype: object

In [25]:
# Convert timestamp to datetime
sorted_df['timestamp'] = pd.to_datetime(df['timestamp'])

In [26]:
sorted_df.dtypes

product_id             int64
timestamp     datetime64[ns]
amount               float64
dtype: object

### Get dates of last 2 mondays

In [17]:
from datetime import date, timedelta

def get_last_two_mondays():
    # Get current date
    current_date = date.today()
    
    # Calculate days since last Monday
    days_since_monday = (current_date.weekday() - 0) % 7  # Monday is weekday 0
    
    # Find last Monday (if today is Monday, go back 7 days)
    last_monday = current_date - timedelta(days=days_since_monday or 7)
    
    # Find the previous Monday (7 days before last Monday)
    previous_monday = last_monday - timedelta(days=7)
    
    return current_date, previous_monday, last_monday

# Get dates
current_date, first_monday, second_monday = get_last_two_mondays()

# Print results
print(f"Current Date: {current_date}")
print(f"Last Two Consecutive Mondays:")
print(f"1. {first_monday} (Week Prior)")
print(f"2. {second_monday} (Most Recent)")

Current Date: 2025-02-05
Last Two Consecutive Mondays:
1. 2025-01-27 (Week Prior)
2. 2025-02-03 (Most Recent)


### Solution

In [28]:
# Group by product_id and sort each group by timestamp
grouped = df.sort_values(['product_id', 'timestamp']).groupby('product_id')
grouped.head()

Unnamed: 0,product_id,timestamp,amount
0,1,2025-01-02,23.45
36,1,2025-01-03,67.89
4,1,2025-01-06,67.89
40,1,2025-01-07,23.45
8,1,2025-01-10,23.45
1,2,2025-01-03,34.56
37,2,2025-01-04,78.9
5,2,2025-01-07,78.9
41,2,2025-01-08,34.56
9,2,2025-01-11,34.56


    Dynamic Monday Calculation: Automatically finds the last two Mondays relative to the latest date in your dataset

    Fallback Mechanism: If no data exists on a Monday, finds the closest prior available date

    Clear Reporting: Shows both the target Mondays and actual dates used for pricing

    Handles All Products: Works for any number of products in your dataset



    Reads and processes your product data

    Identifies the two most recent Mondays in your data timeline

    For each product, finds the closest available price data for those Mondays

    Provides a clear report showing both the target dates and actual dates used

In [30]:
# Get the latest date in the dataset
max_date = df["timestamp"].max()

# Function to find the last two Mondays relative to a reference date
def get_last_two_mondays(reference_date):
    ref_date = pd.Timestamp(reference_date)
    days_since_monday = (ref_date.weekday() - 0) % 7  # Monday = 0
    
    last_monday = ref_date - pd.DateOffset(days=days_since_monday)
    if days_since_monday == 0:
        last_monday = ref_date - pd.DateOffset(weeks=1)
    
    previous_monday = last_monday - pd.DateOffset(weeks=1)
    return [previous_monday.date(), last_monday.date()]

# Get the last two Mondays
last_two_mondays = get_last_two_mondays(max_date)

# Prepare results
results = []
for product_id, group in df.groupby("product_id"):
    group = group.sort_values("timestamp")
    
    for monday in last_two_mondays:
        # Filter records up to and including the Monday
        mask = group["timestamp"] <= pd.Timestamp(monday)
        filtered = group[mask]
        
        if not filtered.empty:
            latest_record = filtered.iloc[-1]
            results.append({
                "product_id": product_id,
                "target_monday": monday,
                "found_date": latest_record["timestamp"].date(),
                "amount": latest_record["amount"]
            })
        else:
            results.append({
                "product_id": product_id,
                "target_monday": monday,
                "found_date": None,
                "amount": None
            })

# Convert to DataFrame and format output
result_df = pd.DataFrame(results)
pivot_table = result_df.pivot(
    index="product_id",
    columns="target_monday",
    values=["amount", "found_date"]
)

# Print human-readable results
print("Last Two Mondays Analyzed:", last_two_mondays)
print("\nProduct Price Report:")
for product_id in df["product_id"].unique():
    product_data = result_df[result_df["product_id"] == product_id]
    print(f"\nProduct {product_id}:")
    for _, row in product_data.iterrows():
        print(f"• For Monday {row['target_monday']}:")
        print(f"  Found amount {row['amount']} (from {row['found_date']})")

Last Two Mondays Analyzed: [datetime.date(2025, 1, 27), datetime.date(2025, 2, 3)]

Product Price Report:

Product 1:
• For Monday 2025-01-27:
  Found amount 67.89 (from 2025-01-27)
• For Monday 2025-02-03:
  Found amount 23.45 (from 2025-02-03)

Product 2:
• For Monday 2025-01-27:
  Found amount 34.56 (from 2025-01-27)
• For Monday 2025-02-03:
  Found amount 34.56 (from 2025-02-01)

Product 3:
• For Monday 2025-01-27:
  Found amount 45.67 (from 2025-01-25)
• For Monday 2025-02-03:
  Found amount 45.67 (from 2025-02-02)

Product 4:
• For Monday 2025-01-27:
  Found amount 56.78 (from 2025-01-26)
• For Monday 2025-02-03:
  Found amount 56.78 (from 2025-02-03)


## Spark Solution

### Importing Libraries

In [31]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, max as max_spark, lit, when
from pyspark.sql.window import Window
from datetime import datetime, timedelta

In [35]:
# Initialize Spark session
spark = SparkSession.builder.appName("ProductPriceAnalysis").getOrCreate()

# 1. Read and prepare data
df = spark.read.csv("Data/products.txt", header=True, inferSchema=True) \
    .withColumn("timestamp", to_date(col("timestamp"), "yyyy-MM-dd"))



In [37]:
# 2. Find reference Mondays based on latest date
latest_date = df.agg(max_spark("timestamp")).collect()[0][0]
latest_date

datetime.date(2025, 2, 10)

In [39]:
def get_mondays(ref_date):
    """Calculate last two Mondays relative to reference date"""
    ref_date = ref_date.date() if isinstance(ref_date, datetime) else ref_date
    days_since_monday = (ref_date.weekday() - 0) % 7  # Monday = 0 in Python
    last_monday = ref_date - timedelta(days=days_since_monday)
    if days_since_monday == 0:
        last_monday -= timedelta(weeks=1)
    return [last_monday - timedelta(weeks=1), last_monday]

mondays = get_mondays(latest_date)
mondays

[datetime.date(2025, 1, 27), datetime.date(2025, 2, 3)]

In [41]:
# 3. Create DataFrames for Mondays and products
mondays_df = spark.createDataFrame(
    [(m.strftime("%Y-%m-%d"),) for m in mondays], 
    ["target_monday"]
).withColumn("target_monday", to_date(col("target_monday"), "yyyy-MM-dd"))
mondays_df

DataFrame[target_monday: date]

In [43]:
products_df = df.select("product_id").distinct()
products_df

DataFrame[product_id: int]

In [45]:
# 4. Find closest prices for each product-Monday combination
result = products_df.crossJoin(mondays_df) \
    .join(
        df.withColumnRenamed("timestamp", "price_date"),
        (col("product_id") == col("product_id")) & 
        (col("price_date") <= col("target_monday")),
        "left"
    ) \
    .groupBy("product_id", "target_monday") \
    .agg(
        max_spark("price_date").alias("found_date"),
        max_spark(col("amount").cast("double")).alias("amount")
    ) \
    .orderBy("product_id", "target_monday")

# 5. Format and show results
final_df = result.withColumn("amount", 
    when(col("found_date").isNull(), None).otherwise(col("amount"))
)

print(f"Reference Mondays: {[m.strftime('%Y-%m-%d') for m in mondays]}")
final_df.show(truncate=False)

AnalysisException: [AMBIGUOUS_REFERENCE] Reference `product_id` is ambiguous, could be: [`product_id`, `product_id`].