<a href="https://colab.research.google.com/github/Gita2023/InternIntelligence_ProjectName/blob/main/Task_4.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Task Four (Big Data Analysis)**

**ANALYSIS USING APACHE SPARK (with PySpark)**





1.   **Environment Setup**







In [4]:
# STEP 1: Install Java
!apt-get install openjdk-11-jdk -y

# STEP 2: Download Spark 3.3.0 with Hadoop 3
!wget -q https://archive.apache.org/dist/spark/spark-3.3.0/spark-3.3.0-bin-hadoop3.tgz

# STEP 3: Extract Spark
!tar -xvzf spark-3.3.0-bin-hadoop3.tgz

# STEP 4: Set environment variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.0-bin-hadoop3"

# STEP 5: Install findspark
!pip install -q findspark

# STEP 6: Initialize findspark
import findspark
findspark.init()

# STEP 7: Test SparkSession
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Spark Setup Test") \
    .getOrCreate()

# Show a simple DataFrame
spark.range(5).show()



Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
openjdk-11-jdk is already the newest version (11.0.27+6~us1-0ubuntu1~22.04).
0 upgraded, 0 newly installed, 0 to remove and 35 not upgraded.
spark-3.3.0-bin-hadoop3/
spark-3.3.0-bin-hadoop3/NOTICE
spark-3.3.0-bin-hadoop3/kubernetes/
spark-3.3.0-bin-hadoop3/kubernetes/tests/
spark-3.3.0-bin-hadoop3/kubernetes/tests/python_executable_check.py
spark-3.3.0-bin-hadoop3/kubernetes/tests/autoscale.py
spark-3.3.0-bin-hadoop3/kubernetes/tests/worker_memory_check.py
spark-3.3.0-bin-hadoop3/kubernetes/tests/py_container_checks.py
spark-3.3.0-bin-hadoop3/kubernetes/tests/decommissioning.py
spark-3.3.0-bin-hadoop3/kubernetes/tests/pyfiles.py
spark-3.3.0-bin-hadoop3/kubernetes/tests/decommissioning_cleanup.py
spark-3.3.0-bin-hadoop3/kubernetes/dockerfiles/
spark-3.3.0-bin-hadoop3/kubernetes/dockerfiles/spark/
spark-3.3.0-bin-hadoop3/kubernetes/dockerfiles/spark/decom.sh
spark-3.3.0-bin-hadoop3/kubernetes

In [5]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.0-bin-hadoop3"
import findspark
findspark.init()



2.   **Load Dataset**



In [8]:
from google.colab import files
uploaded = files.upload()

Saving OnlineRetail.csv to OnlineRetail.csv


In [9]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("OnlineRetail").getOrCreate()
df = spark.read.csv("/content/OnlineRetail.csv", header=True, inferSchema=True)
df.show(5)
df.printSchema()


+---------+---------+--------------------+--------+--------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|   InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+--------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|12/1/2010 8:26|     2.55|     17850|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|12/1/2010 8:26|     2.75|     17850|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|
+---------+---------+--------------------+--------+--------------+---------+----------+--------------+
only showing top 5 rows

root
 |-- InvoiceNo: string (nullable = true)
 |



3.  **Data Cleaning**




In [10]:
# Remove nulls
df = df.dropna(subset=["InvoiceNo", "CustomerID", "Description", "Quantity", "UnitPrice"])

# Filter out canceled orders
df = df.filter(~df["InvoiceNo"].startswith("C"))

# Add TotalPrice column
from pyspark.sql.functions import col
df = df.withColumn("TotalPrice", col("Quantity") * col("UnitPrice"))




4.   **Add Time Columns**




In [11]:
from pyspark.sql.functions import to_timestamp, year, month, dayofweek

df = df.withColumn("InvoiceDate", to_timestamp("InvoiceDate", "M/d/yyyy H:mm"))
df = df.withColumn("Year", year("InvoiceDate"))
df = df.withColumn("Month", month("InvoiceDate"))
df = df.withColumn("Weekday", dayofweek("InvoiceDate"))




5.   **Descriptive Statistics**




In [12]:
df.describe(["Quantity", "UnitPrice", "TotalPrice"]).show()


+-------+------------------+------------------+------------------+
|summary|          Quantity|         UnitPrice|        TotalPrice|
+-------+------------------+------------------+------------------+
|  count|            397924|            397924|            397924|
|   mean|13.021823262733587|3.1161744805546268|22.394748504742306|
| stddev| 180.4202099169817|22.096788031685826| 309.0555883801396|
|    min|                 1|               0.0|               0.0|
|    max|             80995|           8142.75|          168469.6|
+-------+------------------+------------------+------------------+



6. **Sales Analysis**



1.   ***Monthly Revenue***




In [13]:
df.groupBy("Year", "Month") \
  .sum("TotalPrice") \
  .orderBy("Year", "Month") \
  .show()


+----+-----+------------------+
|Year|Month|   sum(TotalPrice)|
+----+-----+------------------+
|2010|   12| 572713.8900000163|
|2011|    1| 569445.0400000077|
|2011|    2| 447137.3500000165|
|2011|    3|  595500.760000013|
|2011|    4| 469200.3610000132|
|2011|    5| 678594.5600000018|
|2011|    6| 661213.6900000116|
|2011|    7| 600091.0110000141|
|2011|    8|  645343.900000004|
|2011|    9| 952838.3819999964|
|2011|   10|1039318.7899999822|
|2011|   11|1161817.3799999433|
|2011|   12| 518192.7900000037|
+----+-----+------------------+




2.   ***Top Selling Products***



In [14]:
df.groupBy("Description") \
  .sum("Quantity") \
  .orderBy("sum(Quantity)", ascending=False) \
  .show(10)


+--------------------+-------------+
|         Description|sum(Quantity)|
+--------------------+-------------+
|PAPER CRAFT , LIT...|        80995|
|MEDIUM CERAMIC TO...|        77916|
|WORLD WAR 2 GLIDE...|        54415|
|JUMBO BAG RED RET...|        46181|
|WHITE HANGING HEA...|        36725|
|ASSORTED COLOUR B...|        35362|
|PACK OF 72 RETROS...|        33693|
|      POPCORN HOLDER|        30931|
|  RABBIT NIGHT LIGHT|        27202|
|MINI PAINT SET VI...|        26076|
+--------------------+-------------+
only showing top 10 rows




3.   ***Top Products by Revenue***



In [15]:
df.groupBy("Description") \
  .sum("TotalPrice") \
  .orderBy("sum(TotalPrice)", ascending=False) \
  .show(10)


+--------------------+------------------+
|         Description|   sum(TotalPrice)|
+--------------------+------------------+
|PAPER CRAFT , LIT...|          168469.6|
|REGENCY CAKESTAND...|142592.94999999966|
|WHITE HANGING HEA...|100448.14999999953|
|JUMBO BAG RED RET...| 85220.78000000044|
|MEDIUM CERAMIC TO...| 81416.72999999998|
|             POSTAGE| 77803.95999999999|
|       PARTY BUNTING| 68844.33000000006|
|ASSORTED COLOUR B...| 56580.34000000046|
|              Manual| 53779.93000000001|
|  RABBIT NIGHT LIGHT|51346.199999999975|
+--------------------+------------------+
only showing top 10 rows




4.   ***Country-wise Revenue***



In [16]:
df.groupBy("Country") \
  .sum("TotalPrice") \
  .orderBy("sum(TotalPrice)", ascending=False) \
  .show()


+---------------+------------------+
|        Country|   sum(TotalPrice)|
+---------------+------------------+
| United Kingdom| 7308391.554000208|
|    Netherlands| 285446.3399999992|
|           EIRE|265545.89999999903|
|        Germany|228867.14000000025|
|         France|209024.05000000022|
|      Australia|138521.30999999976|
|          Spain| 61577.11000000017|
|    Switzerland|56443.950000000084|
|        Belgium| 41196.34000000001|
|         Sweden| 38378.32999999999|
|          Japan|37416.369999999995|
|         Norway|          36165.44|
|       Portugal|33439.889999999956|
|        Finland|22546.079999999994|
|      Singapore|21279.289999999997|
|Channel Islands|20450.439999999995|
|        Denmark|18955.339999999986|
|          Italy| 17483.23999999999|
|         Cyprus|13590.379999999986|
|        Austria|10198.679999999995|
+---------------+------------------+
only showing top 20 rows



7. **Customer Behavior Analysis**




1.   ***RFM (Recency, Frequency, Monetary)***



In [17]:
from pyspark.sql.functions import max, countDistinct, sum

# Recency (assume the last invoice date is the reference point)
max_date = df.agg({"InvoiceDate": "max"}).collect()[0][0]

from pyspark.sql.functions import datediff, lit

rfm = df.groupBy("CustomerID").agg(
    datediff(lit(max_date), max("InvoiceDate")).alias("Recency"),
    countDistinct("InvoiceNo").alias("Frequency"),
    sum("TotalPrice").alias("Monetary")
)
rfm.show(10)


+----------+-------+---------+------------------+
|CustomerID|Recency|Frequency|          Monetary|
+----------+-------+---------+------------------+
|     17389|      0|       34|31833.679999999997|
|     14450|    180|        3|            483.25|
|     15727|     16|        7|           5178.96|
|     13285|     23|        4|2709.1199999999994|
|     14570|    280|        2|218.05999999999997|
|     16503|    106|        4|1431.9299999999998|
|     17420|     50|        3| 598.8299999999999|
|     15447|    330|        1|            155.17|
|     13623|     30|        5| 747.7800000000001|
|     18024|    152|        2|389.78000000000003|
+----------+-------+---------+------------------+
only showing top 10 rows



8. **Prepare for Clustering (for ML)**

In [18]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans

features = VectorAssembler(inputCols=["Recency", "Frequency", "Monetary"], outputCol="features")
rfm_vector = features.transform(rfm).select("CustomerID", "features")

kmeans = KMeans(k=4, seed=42)
model = kmeans.fit(rfm_vector)
rfm_result = model.transform(rfm_vector)
rfm_result.show()


+----------+--------------------+----------+
|CustomerID|            features|prediction|
+----------+--------------------+----------+
|     17389|[0.0,34.0,31833.6...|         3|
|     14450|  [180.0,3.0,483.25]|         0|
|     15727|  [16.0,7.0,5178.96]|         0|
|     13285|[23.0,4.0,2709.11...|         0|
|     14570|[280.0,2.0,218.05...|         0|
|     16503|[106.0,4.0,1431.9...|         0|
|     17420|[50.0,3.0,598.829...|         0|
|     15447|  [330.0,1.0,155.17]|         0|
|     13623|[30.0,5.0,747.780...|         0|
|     18024|[152.0,2.0,389.78...|         0|
|     16861|   [59.0,2.0,173.76]|         0|
|     16339|[284.0,1.0,109.95...|         0|
|     16386|    [28.0,2.0,317.2]|         0|
|     15619|[10.0,1.0,336.400...|         0|
|     15790|[10.0,1.0,220.849...|         0|
|     16574|[71.0,1.0,451.439...|         0|
|     13832|[19.0,1.0,52.1999...|         0|
|     15957|[31.0,1.0,428.889...|         0|
|     17679|[52.0,2.0,1992.11...|         0|
|     1294