In [31]:
#set up PySpark on your machine or cluster. Ensure you have Spark installed and configured properly.
from pyspark import SparkContext, SparkConf

# Create a SparkContext (make sure to configure properly for your cluster)
conf = SparkConf().setAppName("MyMapReduceJob")
sc = SparkContext(conf=conf)

In [33]:
# Load the data into an RDD
lines = sc.textFile("retail_data.csv")

In [34]:
lines

retail_data.csv MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0

In [35]:
# Split each line into fields and filter out the header if necessary
header = lines.first()
data = lines.filter(lambda line: line != header).map(lambda line: line.split(","))

                                                                                

In [36]:
# Convert data to (Country, TotalAmount) tuples with error handling
def parse_line(fields):
    try:
        country = fields[8]            # Country
        total_amount = float(fields[6]) # TotalAmount
        # Check if country field is a valid country name (simple heuristic)
        if not country.replace(' ', '').isalpha():
            return None
        return (country, total_amount)
    except (ValueError, IndexError):
        # Return None for invalid rows
        return None

transactions = data.map(parse_line).filter(lambda x: x is not None)

# Map step: Transform (Country, TotalAmount) to (Country, TotalAmount)
mapped = transactions.map(lambda x: (x[0], x[1]))

# Reduce step: Aggregate by Country and calculate total amount
reduced = mapped.reduceByKey(lambda x, y: x + y)

# Collect the results
results = reduced.collect()

# Print results
for country, total_amount in results:
    print(f"Country: {country}, Total Amount: {total_amount}")

# Stop the SparkContext
sc.stop()

                                                                                

Country: France, Total Amount: 196503.09000000037
Country: Australia, Total Amount: 135330.1899999999
Country: Germany, Total Amount: 220791.78000000032
Country: EIRE, Total Amount: 249151.47999999908
Country: Italy, Total Amount: 16506.029999999988
Country: Lithuania, Total Amount: 1661.06
Country: Japan, Total Amount: 34616.06
Country: Iceland, Total Amount: 4299.800000000001
Country: Channel Islands, Total Amount: 19950.539999999986
Country: Cyprus, Total Amount: 12791.30999999999
Country: Sweden, Total Amount: 36374.14999999998
Country: Israel, Total Amount: 6953.85
Country: Singapore, Total Amount: 9054.690000000002
Country: Lebanon, Total Amount: 1693.8800000000003
Country: United Arab Emirates, Total Amount: 1877.08
Country: Saudi Arabia, Total Amount: 131.17
Country: Czech Republic, Total Amount: 707.72
Country: Unspecified, Total Amount: 2663.9299999999994
Country: Bahrain, Total Amount: 548.4
Country: United Kingdom, Total Amount: 6737626.720000062
Country: Netherlands, Total