<a href="https://colab.research.google.com/github/Abhiprameesh/Case-Study-NYC-Taxi-Analytics-Platform/blob/main/GlobalFinance_Snapshot.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!apt-get update # Update apt-get repository.
!apt-get install openjdk-8-jdk-headless -qq > /dev/null # Install Java.
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz # Download Apache Sparks.
!tar xf spark-3.1.1-bin-hadoop3.2.tgz # Unzip the tgz file.
!pip install -q findspark # Install findspark. Adds PySpark to the System path during runtime.

# Set environment variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

!ls

# Initialize findspark
import findspark
findspark.init()

# Create a PySpark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

0% [Working]            Get:1 https://cli.github.com/packages stable InRelease [3,917 B]
0% [Connecting to archive.ubuntu.com (185.125.190.81)] [Connecting to security.                                                                               Get:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease [1,581 B]
Get:3 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,632 B]
Get:4 https://cli.github.com/packages stable/main amd64 Packages [357 B]
Hit:5 http://archive.ubuntu.com/ubuntu jammy InRelease
Get:6 http://security.ubuntu.com/ubuntu jammy-security InRelease [129 kB]
Get:7 https://r2u.stat.illinois.edu/ubuntu jammy InRelease [6,555 B]
Get:8 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ Packages [85.0 kB]
Get:9 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [128 kB]
Get:10 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  Packages [2,383 kB]
Get:11 https://ppa.launchpadco

In [2]:
retail_df = spark.read.csv(
    "/content/online_retail_II.csv",
    header=True,
    inferSchema=True
)

In [5]:
from pyspark.sql import functions as F

summary_df = retail_df.agg(
    F.count("*").alias("total_transactions"),
    F.countDistinct("Invoice").alias("total_invoices"),
    F.countDistinct("Customer ID").alias("total_distinct_customers"),
    F.countDistinct("StockCode").alias("total_distinct_products")
)

summary_df.show()

+------------------+--------------+------------------------+-----------------------+
|total_transactions|total_invoices|total_distinct_customers|total_distinct_products|
+------------------+--------------+------------------------+-----------------------+
|            529671|         28816|                    4383|                   4632|
+------------------+--------------+------------------------+-----------------------+



In [6]:
from pyspark.sql import functions as F

# Identify return transactions
returns_df = retail_df.filter(F.col("Quantity") < 0)

# Calculate revenue lost due to returns
# Multiplying Price by Quantity (which is negative) gives a negative revenue.
# Taking the sum and then absolute value gives the total revenue lost.
# Or, alternatively, sum Price * abs(Quantity).
revenue_lost_df = returns_df.withColumn("Revenue", F.col("Price") * F.abs(F.col("Quantity")))

total_revenue_lost = revenue_lost_df.agg(F.sum("Revenue")).collect()[0][0]

print(f"Total revenue lost due to returns: {total_revenue_lost:.2f}")

Total revenue lost due to returns: 630643.77


In [7]:
# Calculate total number of transactions
total_transactions = retail_df.count()

# Calculate number of return transactions
num_return_transactions = returns_df.count()

# Calculate percentage of transactions that are returns
percentage_returns = (num_return_transactions / total_transactions) * 100

print(f"Percentage of transactions that are returns: {percentage_returns:.2f}%")

Percentage of transactions that are returns: 2.33%


In [10]:
from pyspark.sql import functions as F

retail_with_revenue = retail_df.withColumn(
    "Revenue",
    F.col("Quantity") * F.col("Price")
)

sales_df = retail_with_revenue.filter(F.col("Revenue") > 0)

# Overall total revenue
total_revenue = sales_df.agg(F.sum("Revenue")).collect()[0][0]

# Average revenue per transaction (considering only sales transactions)
num_sales_transactions = sales_df.count()
average_revenue_per_transaction = total_revenue / num_sales_transactions if num_sales_transactions > 0 else 0

print(f"Overall Total Revenue: {total_revenue:.2f}")
print(f"Average Revenue per Transaction (for sales): {average_revenue_per_transaction:.2f}")

Overall Total Revenue: 10384980.13
Average Revenue per Transaction (for sales): 20.14


In [11]:
customer_revenue_df = sales_df.groupBy("Customer ID").agg(F.sum("Revenue").alias("TotalRevenue"))
customer_revenue_df.show()

+-----------+------------------+
|Customer ID|      TotalRevenue|
+-----------+------------------+
|    17884.0|2355.4399999999996|
|    14285.0|           1374.41|
|    16596.0|            329.48|
|    16822.0|            181.39|
|    17072.0|            282.05|
|    12671.0| 2622.481000000001|
|    14452.0|401.15000000000003|
|    12737.0|            3710.5|
|    15893.0|305.28000000000003|
|    14094.0|335.21999999999997|
|    14269.0|            295.73|
|    12467.0|132.79999999999998|
|    16916.0|            745.22|
|    13607.0|             400.3|
|    14024.0|            318.04|
|    13094.0|505.79999999999995|
|    17633.0|            732.55|
|    15846.0|107.01000000000002|
|    16656.0| 8207.370000000003|
|    17032.0|3004.5999999999995|
+-----------+------------------+
only showing top 20 rows



In [12]:
from pyspark.sql import functions as F

customer_tiers_df = customer_revenue_df.withColumn(
    "SpendingTier",
    F.when(F.col("TotalRevenue") < 1000, "Bronze")
    .when((F.col("TotalRevenue") >= 1000) & (F.col("TotalRevenue") <= 10000), "Silver")
    .otherwise("Gold")
)

customer_tiers_df.show()

+-----------+------------------+------------+
|Customer ID|      TotalRevenue|SpendingTier|
+-----------+------------------+------------+
|    17884.0|2355.4399999999996|      Silver|
|    14285.0|           1374.41|      Silver|
|    16596.0|            329.48|      Bronze|
|    16822.0|            181.39|      Bronze|
|    17072.0|            282.05|      Bronze|
|    12671.0| 2622.481000000001|      Silver|
|    14452.0|401.15000000000003|      Bronze|
|    12737.0|            3710.5|      Silver|
|    15893.0|305.28000000000003|      Bronze|
|    14094.0|335.21999999999997|      Bronze|
|    14269.0|            295.73|      Bronze|
|    12467.0|132.79999999999998|      Bronze|
|    16916.0|            745.22|      Bronze|
|    13607.0|             400.3|      Bronze|
|    14024.0|            318.04|      Bronze|
|    13094.0|505.79999999999995|      Bronze|
|    17633.0|            732.55|      Bronze|
|    15846.0|107.01000000000002|      Bronze|
|    16656.0| 8207.370000000003|  

In [13]:
tier_distribution_df = customer_tiers_df.groupBy("SpendingTier").count()
tier_distribution_df.show()

+------------+-----+
|SpendingTier|count|
+------------+-----+
|      Silver| 1613|
|        Gold|  113|
|      Bronze| 2587|
+------------+-----+



In [14]:
product_revenue_df = sales_df.groupBy("StockCode").agg(F.sum("Revenue").alias("ProductRevenue"))
product_revenue_df.show()

+---------+------------------+
|StockCode|    ProductRevenue|
+---------+------------------+
|    21248| 978.9799999999999|
|    22121| 7459.849999999985|
|    21889|  8860.32000000001|
|    22254| 971.6599999999999|
|   84899F|192.17000000000002|
|    21249|2731.5600000000036|
|    21259| 7516.630000000006|
|   90197B|147.79000000000002|
|    21894| 984.1499999999999|
|    90022| 72.21000000000001|
|    21331|            780.94|
|    20868|            298.72|
|   90210B|            185.24|
|   90177A|14.790000000000003|
|    90143|            377.52|
|    84881|            273.07|
|   85132b|             21.23|
|    21452| 3470.650000000004|
|   90026D|              59.5|
|   85115B|             12.75|
+---------+------------------+
only showing top 20 rows



In [15]:
top_products_df = product_revenue_df.withColumn(
    "RevenueContribution",
    (F.col("ProductRevenue") / total_revenue) * 100
).orderBy(F.desc("ProductRevenue"))

top_5_products = top_products_df.limit(5)
top_5_products.show()

+---------+------------------+-------------------+
|StockCode|    ProductRevenue|RevenueContribution|
+---------+------------------+-------------------+
|        M|         262999.78| 2.5325015224519696|
|    22423|171793.84999999986| 1.6542530441389907|
|   85123A|160019.29999999897| 1.5408724709644082|
|      DOT|         117585.97| 1.1322695708870614|
|   85099B| 90102.24999999977| 0.8676208219693086|
+---------+------------------+-------------------+



In [16]:
from pyspark.sql import functions as F

invoice_summary_df = sales_df.groupBy("Invoice").agg(
    F.sum("Quantity").alias("TotalQuantity"),
    F.sum("Revenue").alias("TotalRevenue"),
    F.countDistinct("StockCode").alias("DistinctProducts")
)

invoice_summary_df.show()

+-------+-------------+------------------+----------------+
|Invoice|TotalQuantity|      TotalRevenue|DistinctProducts|
+-------+-------------+------------------+----------------+
| 498934|          156|             899.0|             108|
| 502860|          134|106.96000000000001|              15|
| 515016|          151|273.95999999999987|              61|
| 527376|          228|            153.33|              53|
| 531036|          935|3424.0799999999995|             379|
| 495185|          776|           2507.06|              44|
| 502620|           61|172.35000000000002|              14|
| 506092|           64|105.71000000000001|              23|
| 517742|          316| 406.2600000000001|              35|
| 523989|         1378|           2154.15|             109|
| 519251|          482|1124.8000000000002|              43|
| 525091|          169|            265.33|              17|
| 498070|          130|            207.15|              53|
| 500148|          183|431.5900000000000

In [17]:
from pyspark.sql import functions as F

unique_products_per_country_df = sales_df.groupBy("Country").agg(
    F.countDistinct("StockCode").alias("UniqueProductsSold")
)

unique_products_per_country_df.show()

+-----------+------------------+
|    Country|UniqueProductsSold|
+-----------+------------------+
|     Sweden|               432|
|  Singapore|               110|
|    Germany|              1457|
|        RSA|               101|
|     France|              1297|
|     Greece|               419|
|    Belgium|               481|
|    Finland|               226|
|      Malta|               162|
|Unspecified|               245|
|    Nigeria|                30|
|      Italy|               510|
|       EIRE|              2047|
|  Lithuania|               113|
|     Norway|               219|
|      Spain|               682|
|    Denmark|               284|
|West Indies|                49|
|   Thailand|                76|
|  Hong Kong|                60|
+-----------+------------------+
only showing top 20 rows



In [18]:
from pyspark.sql import functions as F

product_revenue_per_country_df = sales_df.groupBy("Country", "StockCode").agg(
    F.sum("Revenue").alias("ProductRevenuePerCountry")
)

product_revenue_per_country_df.show()

+--------------+---------+------------------------+
|       Country|StockCode|ProductRevenuePerCountry|
+--------------+---------+------------------------+
|United Kingdom|   84032A|       5880.699999999995|
|United Kingdom|    22315|      1207.5799999999997|
|United Kingdom|    21125|                 1098.42|
|United Kingdom|    90093|      128.22999999999996|
|United Kingdom|    20994|                   39.95|
|United Kingdom|   90214H|                    46.9|
|United Kingdom|   84558a|                  107.68|
|United Kingdom|    84767|                  794.32|
|United Kingdom|    21796|                 2756.69|
|United Kingdom|   47503K|       435.7399999999998|
|United Kingdom|    21663|                  488.16|
|United Kingdom|    21253|                  620.03|
|United Kingdom|   84705D|       97.89999999999999|
|United Kingdom|   16201A|      213.36999999999998|
|United Kingdom|    10109|                    1.68|
|United Kingdom|   84250M|      13.619999999999996|
|United King

In [19]:
from pyspark.sql import functions as F

average_revenue_per_product_per_country_df = product_revenue_per_country_df.groupBy("Country").agg(
    F.avg("ProductRevenuePerCountry").alias("AverageRevenuePerProduct")
)

average_revenue_per_product_per_country_df.show()

+-----------+------------------------+
|    Country|AverageRevenuePerProduct|
+-----------+------------------------+
|     Sweden|      123.90136574074074|
|  Singapore|                  36.707|
|    Germany|       139.0918332189432|
|        RSA|       37.64366336633663|
|     France|      114.16141094834241|
|     Greece|       34.21400954653937|
|    Belgium|       51.05422037422038|
|    Finland|       32.65247787610619|
|      Malta|       33.16975308641973|
|Unspecified|        25.2498775510204|
|    Nigeria|       4.679666666666666|
|      Italy|      29.543490196078455|
|       EIRE|      186.38651685393253|
|  Lithuania|       43.29805309734513|
|     Norway|      118.09735159817355|
|      Spain|        69.7968035190616|
|    Denmark|      179.24947183098598|
|West Indies|       10.94714285714286|
|   Thailand|       40.40184210526316|
|  Hong Kong|       136.7086666666667|
+-----------+------------------------+
only showing top 20 rows



In [20]:
country_diversity_df = unique_products_per_country_df.join(
    average_revenue_per_product_per_country_df,
    on="Country",
    how="inner"
)

country_diversity_df.show()

+-----------+------------------+------------------------+
|    Country|UniqueProductsSold|AverageRevenuePerProduct|
+-----------+------------------+------------------------+
|     Sweden|               432|      123.90136574074074|
|  Singapore|               110|                  36.707|
|    Germany|              1457|       139.0918332189432|
|        RSA|               101|       37.64366336633663|
|     France|              1297|      114.16141094834241|
|     Greece|               419|       34.21400954653937|
|    Belgium|               481|       51.05422037422038|
|    Finland|               226|       32.65247787610619|
|      Malta|               162|       33.16975308641973|
|    Nigeria|                30|       4.679666666666666|
|Unspecified|               245|        25.2498775510204|
|      Italy|               510|      29.543490196078455|
|       EIRE|              2047|      186.38651685393253|
|  Lithuania|               113|       43.29805309734513|
|     Norway| 

In [21]:
from pyspark.sql import functions as F

sales_df = sales_df.withColumn(
    "TransactionBucket",
    F.when(F.col("Revenue") < 50, "Small")
    .when((F.col("Revenue") >= 50) & (F.col("Revenue") <= 200), "Medium")
    .otherwise("Large")
)

sales_df.show()

+-------+---------+--------------------+--------+-------------------+-----+-----------+--------------+------------------+-----------------+
|Invoice|StockCode|         Description|Quantity|        InvoiceDate|Price|Customer ID|       Country|           Revenue|TransactionBucket|
+-------+---------+--------------------+--------+-------------------+-----+-----------+--------------+------------------+-----------------+
| 489434|    85048|15CM CHRISTMAS GL...|      12|2009-12-01 07:45:00| 6.95|    13085.0|United Kingdom|              83.4|           Medium|
| 489434|   79323P|  PINK CHERRY LIGHTS|      12|2009-12-01 07:45:00| 6.75|    13085.0|United Kingdom|              81.0|           Medium|
| 489434|   79323W| WHITE CHERRY LIGHTS|      12|2009-12-01 07:45:00| 6.75|    13085.0|United Kingdom|              81.0|           Medium|
| 489434|    22041|"RECORD FRAME 7""...|      48|2009-12-01 07:45:00|  2.1|    13085.0|United Kingdom|100.80000000000001|           Medium|
| 489434|    21232|S

In [22]:
from pyspark.sql import functions as F

transaction_summary_df = sales_df.groupBy("TransactionBucket").agg(
    F.count("Invoice").alias("NumberOfTransactions"),
    F.avg("Quantity").alias("AverageQuantity")
)

transaction_summary_df.show()

+-----------------+--------------------+-----------------+
|TransactionBucket|NumberOfTransactions|  AverageQuantity|
+-----------------+--------------------+-----------------+
|           Medium|               28074|48.99244852888794|
|            Small|              482844| 6.43845631301207|
|            Large|                4815|287.9079958463136|
+-----------------+--------------------+-----------------+

