# Data Processing with Spark and Hadoop

Hadoop MapReduce Job

In [None]:
# Importing necessary libraries
from collections import defaultdict
# providing the dataset path
data_path = "/content/retailstore_5mn.csv"
# Reading the CSV file (simulating HDFS read)
import pandas as pd
df = pd.read_csv(data_path)
# Simulating Hadoop MapReduce: Map function
def mapper(df):
    result = defaultdict(int)
    for index, row in df.iterrows():
        result[row['Gender']] += 1
    return result
# Simulating Hadoop MapReduce: Reduce function
def reducer(mapped_data):
    result = defaultdict(int)
    for key, value in mapped_data.items():
        result[key] += value
    return result
# Running the map function
mapped_data = mapper(df)
# Running the reduce function
reduced_data = reducer(mapped_data)
# Printing the final result
print("Gender Frequency Count:", reduced_data)

Gender Frequency Count: defaultdict(<class 'int'>, {'Male': 2603096, 'Female': 2412641})


Apache Spark Job (Using PySpark)

In [None]:
# Installing and setting up PySpark
!pip install pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, count
# Starting the Spark Session
spark = SparkSession.builder.master("local[*]").appName("RetailStoreAnalytics").getOrCreate()



In [None]:
# Loading the dataset
data_path = "/content/retailstore_5mn.csv"
# Reading the dataset using Spark
df_spark = spark.read.csv(data_path, header=True, inferSchema=True)
# Showing the first few rows of the dataset
df_spark.show()

+----------+-----------+------+------+-------+
|CustomerID|        Age|Salary|Gender|Country|
+----------+-----------+------+------+-------+
|         1|       18.0| 20000|  Male|Germany|
|         2|       19.0| 22000|Female| France|
|         3|       20.0| 24000|Female|England|
|         4|       21.0|  2600|  Male|England|
|         5|       22.0| 50000|  Male| France|
|         6|       23.0| 35000|Female|England|
|         7|       24.0|  4300|  Male|Germany|
|         8|       25.0| 32000|Female| France|
|         9|       35.0| 35000|  Male|Germany|
|        10|       27.0| 37000|Female| France|
|        11|       31.0| 25000|  Male|Germany|
|        12|32.38181818| 27000|Female| France|
|        13|33.76363636| 29000|Female|England|
|        14|35.14545455|  7600|  Male|England|
|        15|36.52727273| 55000|  Male| France|
|        16|37.90909091| 40000|Female|England|
|        17|39.29090909|  9300|  Male|Germany|
|        18|40.67272727| 37000|Female| France|
|        19|4

In [None]:
# Data Aggregation: Calculating the average salary by gender
avg_salary = df_spark.groupBy("Gender").agg(avg("Salary").alias("Average_Salary"))
avg_salary.show()

+------+------------------+
|Gender|    Average_Salary|
+------+------------------+
|Female| 35388.00070130616|
|  Male|35387.738139507725|
+------+------------------+



In [None]:
# Data Filtering: Filtering rows where Salary is greater than 30,000
high_salary_df = df_spark.filter(df_spark['Salary'] > 30000)
high_salary_df.show()

+----------+-----------+------+------+-------+
|CustomerID|        Age|Salary|Gender|Country|
+----------+-----------+------+------+-------+
|         5|       22.0| 50000|  Male| France|
|         6|       23.0| 35000|Female|England|
|         8|       25.0| 32000|Female| France|
|         9|       35.0| 35000|  Male|Germany|
|        10|       27.0| 37000|Female| France|
|        15|36.52727273| 55000|  Male| France|
|        16|37.90909091| 40000|Female|England|
|        18|40.67272727| 37000|Female| France|
|        19|42.05454545| 40000|  Male|Germany|
|        20|43.43636364| 42000|Female| France|
|        22|       46.2| 32000|Female| France|
|        23|47.58181818| 34000|Female| France|
|        25|50.34545455| 60000|Female|Germany|
|        26|51.72727273| 45000|Female|Germany|
|        28|54.49090909| 42000|  Male|Germany|
|        29|55.87272727| 45000|  Male|England|
|        30|57.25454545| 47000|  Male|England|
|        31|58.63636364| 35000|  Male|England|
|        32|6

In [None]:
# Exploratory Data Analysis (EDA): Counting the number of people by country
country_count = df_spark.groupBy("Country").agg(count("CustomerID").alias("Customer_Count"))
country_count.show()

+-------+--------------+
|Country|Customer_Count|
+-------+--------------+
|Germany|       1332307|
| France|       1410671|
|England|       2272759|
+-------+--------------+



In [None]:
# Stopping the Spark session when done
spark.stop()

# Advanced Analytics and Machine Learning

In [10]:
# Importing necessary libraries
from pyspark.sql import SparkSession
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.sql.functions import col
# Initializing the Spark session
spark = SparkSession.builder.appName("GenderPrediction").getOrCreate()

In [11]:
# Loading the dataset
data_path = "/content/retailstore_5mn.csv"
df = spark.read.csv(data_path, header=True, inferSchema=True)
# Showing the first few rows of the data
df.show(5)

+----------+----+------+------+-------+
|CustomerID| Age|Salary|Gender|Country|
+----------+----+------+------+-------+
|         1|18.0| 20000|  Male|Germany|
|         2|19.0| 22000|Female| France|
|         3|20.0| 24000|Female|England|
|         4|21.0|  2600|  Male|England|
|         5|22.0| 50000|  Male| France|
+----------+----+------+------+-------+
only showing top 5 rows



In [12]:
# Managing the missing values
df = df.dropna()
# Converting the 'Gender' column to the numeric
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer(inputCol="Gender", outputCol="GenderIndex")
df = indexer.fit(df).transform(df)
# Choosing significant features for the model
features = ["Age", "Salary"]
df = df.select("GenderIndex", *features)
# Developing the feature vector
assembler = VectorAssembler(inputCols=features, outputCol="features")
df = assembler.transform(df)


In [13]:
# Splitting the data into training and testing sets
train_data, test_data = df.randomSplit([0.8, 0.2], seed=1234)

In [14]:
# Initializing the Logistic Regression model
lr = LogisticRegression(labelCol="GenderIndex", featuresCol="features")
# Training the model on the training data
lr_model = lr.fit(train_data)

In [15]:
# Making the predictions on the test data
predictions = lr_model.transform(test_data)
# Evaluating the model utilizing the BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(labelCol="GenderIndex")
accuracy = evaluator.evaluate(predictions)
print(f"Model Accuracy: {accuracy}")

Model Accuracy: 0.49945660161104444
