In [2]:
#Pip install pyspark
!pip install pyspark

Defaulting to user installation because normal site-packages is not writeable
Looking in links: /usr/share/pip-wheels


In [3]:
#Import libraries 
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark import SparkContext


In [4]:
import os

java_home = os.environ.get('JAVA_HOME')
if java_home:
    print("JAVA_HOME is set to:", java_home)
else:
    print("JAVA_HOME is not set.")


JAVA_HOME is set to: /usr/lib/jvm/default-java


In [5]:
# Initialize Spark session
spark = SparkSession.builder \
    .appName('RetailMapReduce') \
    .config("spark.executor.memory", "2g") \
    .config("spark.driver.memory", "1g") \
    .config("spark.sql.shuffle.partitions", "100") \
    .getOrCreate()

24/08/05 07:52:19 WARN Utils: Your hostname, blue-nbjupyterhub1 resolves to a loopback address: 127.0.0.1; using 10.0.0.11 instead (on interface ens5)
24/08/05 07:52:19 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/08/05 07:52:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [6]:
#Load the CSV file
df = spark.read.csv('retail.csv',header=True,escape="\"")

In [7]:
# Verify loading - Display as table
pandas_df = df.toPandas()
display(pandas_df)

Unnamed: 0,InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country
0,536365,85123A,WHITE HANGING HEART T-LIGHT HOLDER,6,12/1/10 8:26,2.55,17850,United Kingdom
1,536365,71053,WHITE METAL LANTERN,6,12/1/10 8:26,3.39,17850,United Kingdom
2,536365,84406B,CREAM CUPID HEARTS COAT HANGER,8,12/1/10 8:26,2.75,17850,United Kingdom
3,536365,84029G,KNITTED UNION FLAG HOT WATER BOTTLE,6,12/1/10 8:26,3.39,17850,United Kingdom
4,536365,84029E,RED WOOLLY HOTTIE WHITE HEART.,6,12/1/10 8:26,3.39,17850,United Kingdom
...,...,...,...,...,...,...,...,...
2495,536592,20761,BLUE PAISLEY SKETCHBOOK,1,12/1/10 17:06,7.62,,United Kingdom
2496,536592,20780,BLACK EAR MUFF HEADPHONES,1,12/1/10 17:06,11.02,,United Kingdom
2497,536592,20846,ZINC HEART LATTICE T-LIGHT HOLDER,1,12/1/10 17:06,2.51,,United Kingdom
2498,536592,20914,SET/5 RED RETROSPOT LID GLASS BOWLS,1,12/1/10 17:06,5.91,,United Kingdom


In [8]:
# Convert DataFrame to RDD
rdd = df.rdd

In [9]:
#Verify rdd
rdd

MapPartitionsRDD[16] at javaToPython at NativeMethodAccessorImpl.java:0

In [10]:
#Import pyspark libraries
from pyspark.sql import SparkSession
from collections import defaultdict
import re

In [11]:
# Function to extract attributes from text 
def extract_attributes(text):
    # Define attribute patterns relevant to retail data
    attribute_patterns = {
        'electronics': r'\belectronics\b',
        'furniture': r'\bfurniture\b',
        'clothing': r'\bclothing\b',
        'toys': r'\btoys\b',
        'home': r'\bhome\b',
        'blue':r'\bblue\b',
        # Add more attributes as needed
    }
    attribute_words = defaultdict(int)
    for attribute, pattern in attribute_patterns.items():
        matches = re.findall(pattern, text, flags=re.IGNORECASE)
        if matches:
            attribute_words[attribute] += len(matches)
    return attribute_words.items()


In [19]:
# Extract header and data rows
header = df.columns  # Get the header column names
data_rdd = rdd.map(lambda row: row.asDict())  # Convert Row objects to dictionaries

In [None]:
# Define a function to extract the 'Description' field and apply attribute extraction
def parse_row(row):
    description = row.get('Description', '') 
    return extract_attributes(description)

In [12]:
# Map step: Extract attributes from each description
mapped_rdd = rdd.flatMap(lambda row: extract_attributes(row['Description']))


In [13]:
# Reduce step: Combine the extracted attributes
reduced_rdd = mapped_rdd.reduceByKey(lambda x, y: x + y)

In [14]:
# Cache the RDD if you are reusing it multiple times
reduced_rdd.cache()

PythonRDD[21] at RDD at PythonRDD.scala:53

In [15]:
# Collect the results to the driver
results = reduced_rdd.collect()


                                                                                

In [16]:
from pyspark.sql import Row

# Convert results to DataFrame
results_df = spark.createDataFrame([Row(Attribute=k, Count=v) for k, v in results])

In [17]:
# Display the DataFrame
results_df.show()

+---------+-----+
|Attribute|Count|
+---------+-----+
|     home|   23|
|     blue|   73|
|     toys|    3|
+---------+-----+



In [24]:
results_df.write.mode('overwrite').option("header", "true").csv("output/csvfile")

                                                                                

In [None]:
# Stop Spark session
#spark.stop()
