# BIG DATA ANALYTICS AND RNN IN BANANA PRICE FORECASTING

## STOP ANY ACTIVE SPARK SESSION

In [1]:
from pyspark.sql import SparkSession

# Stop all active Spark sessions
SparkSession.builder.getOrCreate().stop()


## Import all the necessary libraries

In [2]:
import warnings
warnings.filterwarnings("ignore")
from pyspark.ml import Pipeline
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

In [3]:
#List the contents of the root directory in HDFS
!hdfs dfs -ls /

Found 5 items
-rwxrwxrwx   1 hduser supergroup    1114475 2024-03-27 10:39 /CPM04.20240327101259.csv
-rwxrwxrwx   1 hduser supergroup    1197604 2024-03-27 10:39 /CPM12.20240327101308.csv
drwxr-xr-x   - hduser supergroup          0 2024-03-07 10:06 /output1
drwxr-xr-x   - hduser supergroup          0 2024-03-07 13:14 /output2
drwxr-xr-x   - hduser supergroup          0 2024-03-14 14:03 /user1


## Preprocessing Data using Spark
This process involves a series of steps. They include:-
1. Loading 2 CSV files on national average prices of food data stored in hadoop.

2. Data Exploration: Perform exploratory data analysis (EDA) 

3. Data integration, merging the two csv datasets

4. Data Export: Export the preprocessed data to hadoop for storage

## Initialize Spark Session

In [4]:
spark = SparkSession.builder \
    .appName("Hadoop to python") \
    .getOrCreate()

## Step one: Load the data from hadoop using spark

In [5]:
#The two datasets hadoop path
dataset_one = "hdfs://localhost:9000/CPM04.20240327101259.csv"
dataset_two = "hdfs://localhost:9000/CPM12.20240327101308.csv"

#Create spark DataFrames for the two datasets
df_one = spark.read.csv(dataset_one, header=True, inferSchema=True)
df_two = spark.read.csv(dataset_two, header=True, inferSchema=True)

#View the two DataFRames
df_one, df_two

                                                                                

(DataFrame[STATISTIC: string, Statistic Label: string, C02363V02844: int, Consumer Item: string, TLIST(M1): int, Month: string, UNIT: string, VALUE: double],
 DataFrame[STATISTIC: string, STATISTIC Label: string, TLIST(M1): int, Month: string, C02363V03422: int, Consumer Item: string, UNIT: string, VALUE: double])

## Step Two: Performing Explorartory Data Analysis (EDA)

In [6]:
#View the first two observations of dataset one
df_one.head(2)

[Row(STATISTIC='CPM04', Statistic Label='National Average  Price', C02363V02844=10010, Consumer Item='Round steak per kg.', TLIST(M1)=200112, Month='2001M12', UNIT='Euro', VALUE=8.414),
 Row(STATISTIC='CPM04', Statistic Label='National Average  Price', C02363V02844=10010, Consumer Item='Round steak per kg.', TLIST(M1)=200201, Month='2002M01', UNIT='Euro', VALUE=8.696)]

In [7]:
#View the first two observations fo dataset two 
df_two.head(2)

[Row(STATISTIC='CPM12', STATISTIC Label='National Average Price', TLIST(M1)=201112, Month='2011 December', C02363V03422=10020, Consumer Item='White, self raising flour per 2 kg', UNIT='Euro', VALUE=None),
 Row(STATISTIC='CPM12', STATISTIC Label='National Average Price', TLIST(M1)=201112, Month='2011 December', C02363V03422=10030, Consumer Item='Brown, wholemeal flour per 2 kg', UNIT='Euro', VALUE=None)]

In [8]:
#Get the Schema for dataset one
df_one.printSchema()

root
 |-- STATISTIC: string (nullable = true)
 |-- Statistic Label: string (nullable = true)
 |-- C02363V02844: integer (nullable = true)
 |-- Consumer Item: string (nullable = true)
 |-- TLIST(M1): integer (nullable = true)
 |-- Month: string (nullable = true)
 |-- UNIT: string (nullable = true)
 |-- VALUE: double (nullable = true)



## Findings
Shows there is missing data in all the variables

In [9]:
#Get the Schema for dataset two
df_two.printSchema()

root
 |-- STATISTIC: string (nullable = true)
 |-- STATISTIC Label: string (nullable = true)
 |-- TLIST(M1): integer (nullable = true)
 |-- Month: string (nullable = true)
 |-- C02363V03422: integer (nullable = true)
 |-- Consumer Item: string (nullable = true)
 |-- UNIT: string (nullable = true)
 |-- VALUE: double (nullable = true)



## Findings
SHows there is missing data in all the variables

In [10]:
#Check the cahracteristics of the datasets
df_one.describe(), df_two.describe()

(DataFrame[summary: string, STATISTIC: string, Statistic Label: string, C02363V02844: string, Consumer Item: string, TLIST(M1): string, Month: string, UNIT: string, VALUE: string],
 DataFrame[summary: string, STATISTIC: string, STATISTIC Label: string, TLIST(M1): string, Month: string, C02363V03422: string, Consumer Item: string, UNIT: string, VALUE: string])

In [11]:
# Count the number of observations
dataone_count = df_one.count()

print("Number of observations:", dataone_count)

Number of observations: 10527


In [12]:
# Count the number of observations
datatwo_count = df_two.count()

print("Number of observations:", datatwo_count)

Number of observations: 11172


# Delete variables that will be unused

In [13]:
df_one.show()

+---------+--------------------+------------+-------------------+---------+-------+----+-----+
|STATISTIC|     Statistic Label|C02363V02844|      Consumer Item|TLIST(M1)|  Month|UNIT|VALUE|
+---------+--------------------+------------+-------------------+---------+-------+----+-----+
|    CPM04|National Average ...|       10010|Round steak per kg.|   200112|2001M12|Euro|8.414|
|    CPM04|National Average ...|       10010|Round steak per kg.|   200201|2002M01|Euro|8.696|
|    CPM04|National Average ...|       10010|Round steak per kg.|   200202|2002M02|Euro|8.643|
|    CPM04|National Average ...|       10010|Round steak per kg.|   200203|2002M03|Euro|8.791|
|    CPM04|National Average ...|       10010|Round steak per kg.|   200204|2002M04|Euro|8.599|
|    CPM04|National Average ...|       10010|Round steak per kg.|   200205|2002M05|Euro|8.607|
|    CPM04|National Average ...|       10010|Round steak per kg.|   200206|2002M06|Euro|8.742|
|    CPM04|National Average ...|       10010|Round

In [14]:
#Drop all columns except month, consumer item and VALUE)
df_one = df_one.drop("STATISTIC","STATISTIC Label","TLIST(M1)","UNIT","C02363V02844")
df_one.show()

+-------------------+-------+-----+
|      Consumer Item|  Month|VALUE|
+-------------------+-------+-----+
|Round steak per kg.|2001M12|8.414|
|Round steak per kg.|2002M01|8.696|
|Round steak per kg.|2002M02|8.643|
|Round steak per kg.|2002M03|8.791|
|Round steak per kg.|2002M04|8.599|
|Round steak per kg.|2002M05|8.607|
|Round steak per kg.|2002M06|8.742|
|Round steak per kg.|2002M07|8.733|
|Round steak per kg.|2002M08|8.806|
|Round steak per kg.|2002M09|8.696|
|Round steak per kg.|2002M10|8.838|
|Round steak per kg.|2002M11| 8.79|
|Round steak per kg.|2002M12|8.887|
|Round steak per kg.|2003M01|  8.9|
|Round steak per kg.|2003M02|8.399|
|Round steak per kg.|2003M03|8.876|
|Round steak per kg.|2003M04| 8.87|
|Round steak per kg.|2003M05|8.669|
|Round steak per kg.|2003M06|8.263|
|Round steak per kg.|2003M07|8.899|
+-------------------+-------+-----+
only showing top 20 rows



In [15]:
df_two.show()

+---------+--------------------+---------+-------------+------------+--------------------+----+-----+
|STATISTIC|     STATISTIC Label|TLIST(M1)|        Month|C02363V03422|       Consumer Item|UNIT|VALUE|
+---------+--------------------+---------+-------------+------------+--------------------+----+-----+
|    CPM12|National Average ...|   201112|2011 December|       10020|White, self raisi...|Euro| null|
|    CPM12|National Average ...|   201112|2011 December|       10030|Brown, wholemeal ...|Euro| null|
|    CPM12|National Average ...|   201112|2011 December|       10040|Bread, white slic...|Euro| null|
|    CPM12|National Average ...|   201112|2011 December|       10050|Bread, brown slic...|Euro| null|
|    CPM12|National Average ...|   201112|2011 December|       10190|  Spaghetti per 500g|Euro| null|
|    CPM12|National Average ...|   201112|2011 December|       10280|Sirloin steak per kg|Euro| null|
|    CPM12|National Average ...|   201112|2011 December|       10290|Striploin ste

In [16]:
#Drop all columns except month, consumer item and VALUE)
df_two = df_two.drop("STATISTIC","STATISTIC Label","TLIST(M1)","UNIT","C02363V03422")
df_two.show()

+-------------+--------------------+-----+
|        Month|       Consumer Item|VALUE|
+-------------+--------------------+-----+
|2011 December|White, self raisi...| null|
|2011 December|Brown, wholemeal ...| null|
|2011 December|Bread, white slic...| null|
|2011 December|Bread, brown slic...| null|
|2011 December|  Spaghetti per 500g| null|
|2011 December|Sirloin steak per kg| null|
|2011 December|Striploin steak p...| null|
|2011 December|Roast beef - tops...| null|
|2011 December|Sliced / diced be...| null|
|2011 December|Pork loin chops p...| null|
|2011 December|   Pork steak per kg| null|
|2011 December|Lamb - whole leg ...| null|
|2011 December|Lamb loin chops p...| null|
|2011 December|Lamb gigot chops ...| null|
|2011 December|Uncooked chicken ...| null|
|2011 December| Lamb's liver per kg| null|
|2011 December|   Ham fillet per kg| null|
|2011 December|   Cooked ham per kg| null|
|2011 December|Best back rashers...| null|
|2011 December|Pork sausages per kg| null|
+----------

## Drop from consumer Item, everything except bananas

In [17]:
bananaprice_one = df_one.filter(df_one["Consumer Item"] == "Bananas per kg.")
bananaprice_two = df_two.filter(df_two["Consumer Item"] == "Bananas per kg")

In [18]:
bananaprice_one.show(), bananaprice_two.show()

+---------------+-------+-----+
|  Consumer Item|  Month|VALUE|
+---------------+-------+-----+
|Bananas per kg.|2001M12|1.685|
|Bananas per kg.|2002M01|1.691|
|Bananas per kg.|2002M02| 1.67|
|Bananas per kg.|2002M03|1.496|
|Bananas per kg.|2002M04|1.669|
|Bananas per kg.|2002M05|1.623|
|Bananas per kg.|2002M06|1.652|
|Bananas per kg.|2002M07|1.652|
|Bananas per kg.|2002M08|1.664|
|Bananas per kg.|2002M09|1.677|
|Bananas per kg.|2002M10|1.694|
|Bananas per kg.|2002M11|1.694|
|Bananas per kg.|2002M12|1.693|
|Bananas per kg.|2003M01|1.674|
|Bananas per kg.|2003M02|1.671|
|Bananas per kg.|2003M03|1.489|
|Bananas per kg.|2003M04| 1.62|
|Bananas per kg.|2003M05|1.513|
|Bananas per kg.|2003M06| 1.42|
|Bananas per kg.|2003M07|1.459|
+---------------+-------+-----+
only showing top 20 rows

+--------------+--------------+-----+
|         Month| Consumer Item|VALUE|
+--------------+--------------+-----+
| 2011 December|Bananas per kg| null|
|  2012 January|Bananas per kg| 1.33|
| 2012 February|

(None, None)

## Step Three: Data Integration. Merging the two datasets into one

In [19]:
# Rename columns in df1 to match columns in df2
rename_mapping = {
    "VALUE": "National Average Price(Euros)"
    # Add more mappings as needed
}
for old_col, new_col in rename_mapping.items():
    bananaprice_one = bananaprice_one.withColumnRenamed(old_col, new_col)

In [20]:
# Rename columns in df1 to match columns in df2
rename_mapping = {
    "VALUE":"National Average Price(Euros)"
}
for old_col, new_col in rename_mapping.items():
    bananaprice_two = bananaprice_two.withColumnRenamed(old_col, new_col)

In [21]:
bananaprice_one.show(), bananaprice_two.show()

+---------------+-------+-----------------------------+
|  Consumer Item|  Month|National Average Price(Euros)|
+---------------+-------+-----------------------------+
|Bananas per kg.|2001M12|                        1.685|
|Bananas per kg.|2002M01|                        1.691|
|Bananas per kg.|2002M02|                         1.67|
|Bananas per kg.|2002M03|                        1.496|
|Bananas per kg.|2002M04|                        1.669|
|Bananas per kg.|2002M05|                        1.623|
|Bananas per kg.|2002M06|                        1.652|
|Bananas per kg.|2002M07|                        1.652|
|Bananas per kg.|2002M08|                        1.664|
|Bananas per kg.|2002M09|                        1.677|
|Bananas per kg.|2002M10|                        1.694|
|Bananas per kg.|2002M11|                        1.694|
|Bananas per kg.|2002M12|                        1.693|
|Bananas per kg.|2003M01|                        1.674|
|Bananas per kg.|2003M02|                       

(None, None)

In [22]:
# Selecting columns in the same order
bananaprice_one = bananaprice_one.select("Consumer Item", "Month", "National Average Price(Euros)")
bananaprice_two = bananaprice_two.select("Consumer Item", "Month", "National Average Price(Euros)")

# Performing the union operation
merged_df = bananaprice_one.union(bananaprice_two)

# Displaying the results
merged_df.show(truncate=False)


+---------------+-------+-----------------------------+
|Consumer Item  |Month  |National Average Price(Euros)|
+---------------+-------+-----------------------------+
|Bananas per kg.|2001M12|1.685                        |
|Bananas per kg.|2002M01|1.691                        |
|Bananas per kg.|2002M02|1.67                         |
|Bananas per kg.|2002M03|1.496                        |
|Bananas per kg.|2002M04|1.669                        |
|Bananas per kg.|2002M05|1.623                        |
|Bananas per kg.|2002M06|1.652                        |
|Bananas per kg.|2002M07|1.652                        |
|Bananas per kg.|2002M08|1.664                        |
|Bananas per kg.|2002M09|1.677                        |
|Bananas per kg.|2002M10|1.694                        |
|Bananas per kg.|2002M11|1.694                        |
|Bananas per kg.|2002M12|1.693                        |
|Bananas per kg.|2003M01|1.674                        |
|Bananas per kg.|2003M02|1.671                  

In [23]:
merged_df.show(n=merged_df.count(), truncate=False)

+---------------+--------------+-----------------------------+
|Consumer Item  |Month         |National Average Price(Euros)|
+---------------+--------------+-----------------------------+
|Bananas per kg.|2001M12       |1.685                        |
|Bananas per kg.|2002M01       |1.691                        |
|Bananas per kg.|2002M02       |1.67                         |
|Bananas per kg.|2002M03       |1.496                        |
|Bananas per kg.|2002M04       |1.669                        |
|Bananas per kg.|2002M05       |1.623                        |
|Bananas per kg.|2002M06       |1.652                        |
|Bananas per kg.|2002M07       |1.652                        |
|Bananas per kg.|2002M08       |1.664                        |
|Bananas per kg.|2002M09       |1.677                        |
|Bananas per kg.|2002M10       |1.694                        |
|Bananas per kg.|2002M11       |1.694                        |
|Bananas per kg.|2002M12       |1.693                  

In [24]:
from pyspark.sql.functions import regexp_replace, lit

# Define a dictionary mapping abbreviated month/year strings to full names
month_year_mapping = {
    "M01": "January", "M02": "February", "M03": "March", "M04": "April", "M05": "May", "M06": "June",
    "M07": "July", "M08": "August", "M09": "September", "M10": "October", "M11": "November", "M12": "December"
}

# Create a regular expression pattern to match month/year strings
pattern = "|".join([f"({year}M{month})" for year in range(2001, 2012) for month in range(1, 13)])

# Replace values in the "Month" column using regexp_replace
merged_df = merged_df.withColumn("Month", regexp_replace("Month", pattern, 
                            lambda x: lit(f"{x[:4]} {month_year_mapping[x[5:]]}")))

# Show the DataFrame
merged_df.show(truncate=False)



TypeError: Invalid argument, not a string or column: <function <lambda> at 0x70de62fcf250> of type <class 'function'>. For column literals, use 'lit', 'array', 'struct' or 'create_map' function.

In [None]:
# Count the number of observations
mergeddata_count = merged_df.count()

print("Number of observations:", mergeddata_count)

In [None]:
merged_df.describe()

## EXPORT THE PREPROCESSED DATA TO HADOOP FOR STORAGE

In [None]:
# Export preprocessed data
preprocessed_data_path = "hdfs://<hadoop_hostname>:<port>/path/to/preprocessed_data.csv"
df_one.write.csv(preprocessed_data_path, mode="overwrite", header=True)