In [39]:
!pip install pyspark



In [44]:
from pyspark.sql import SparkSession

In [46]:
spark = SparkSession.builder.appName("Sales").master("local[*]").config("spark.network.timeout","600s") \
 .config("spark.executor.heartbeatInterval","60s") \
 .config("spark.executor.memory","4g") \
 .config("spark.driver,memory","2g") \
 .getOrCreate()


In [48]:
import pandas as pd
df1 = pd.read_csv("/content/products.csv")
df2 = pd.read_csv("/content/sales.csv")
df3 = pd.read_csv("/content/stores.csv")
df4 = pd.read_csv("/content/inventory.csv")

df1 = spark.createDataFrame(df1)
df2 = spark.createDataFrame(df2)
df3 = spark.createDataFrame(df3)
df4 = spark.createDataFrame(df4)

In [9]:
sales = df2.join(df1,on = "Product_ID",how = "left").drop(df1.Product_ID)\
       .join(df3,on="Store_ID",how="left").drop(df3.Store_ID)\
       .join(df4,on="Store_ID",how="left").drop(df4.Store_ID).drop(df4.Product_ID)
sales.show()

+--------+----------+-------+----------+-----+-------------+----------------+------------+-------------+--------------------+----------+--------------+---------------+-------------+
|Store_ID|Product_ID|Sale_ID|      Date|Units| Product_Name|Product_Category|Product_Cost|Product_Price|          Store_Name|Store_City|Store_Location|Store_Open_Date|Stock_On_Hand|
+--------+----------+-------+----------+-----+-------------+----------------+------------+-------------+--------------------+----------+--------------+---------------+-------------+
|       6|       8.0|      3|2022-01-01|  1.0|Deck Of Cards|           Games|      $3.99 |       $6.99 |Maven Toys Mexica...|  Mexicali|    Commercial|     2003-12-13|            3|
|       6|       8.0|      3|2022-01-01|  1.0|Deck Of Cards|           Games|      $3.99 |       $6.99 |Maven Toys Mexica...|  Mexicali|    Commercial|     2003-12-13|            8|
|       6|       8.0|      3|2022-01-01|  1.0|Deck Of Cards|           Games|      $3.99 |

In [10]:
sales.printSchema()

root
 |-- Store_ID: long (nullable = true)
 |-- Product_ID: double (nullable = true)
 |-- Sale_ID: long (nullable = true)
 |-- Date: string (nullable = true)
 |-- Units: double (nullable = true)
 |-- Product_Name: string (nullable = true)
 |-- Product_Category: string (nullable = true)
 |-- Product_Cost: string (nullable = true)
 |-- Product_Price: string (nullable = true)
 |-- Store_Name: string (nullable = true)
 |-- Store_City: string (nullable = true)
 |-- Store_Location: string (nullable = true)
 |-- Store_Open_Date: string (nullable = true)
 |-- Stock_On_Hand: long (nullable = true)



In [11]:
from pyspark.sql.functions import to_date
sales = sales.withColumn("Date",to_date("Date","yyyy-MM-dd"))\
        .withColumn("Store_Open_Date",to_date("Store_Open_Date","yyyy-MM-dd"))

In [12]:
sales.printSchema()

root
 |-- Store_ID: long (nullable = true)
 |-- Product_ID: double (nullable = true)
 |-- Sale_ID: long (nullable = true)
 |-- Date: date (nullable = true)
 |-- Units: double (nullable = true)
 |-- Product_Name: string (nullable = true)
 |-- Product_Category: string (nullable = true)
 |-- Product_Cost: string (nullable = true)
 |-- Product_Price: string (nullable = true)
 |-- Store_Name: string (nullable = true)
 |-- Store_City: string (nullable = true)
 |-- Store_Location: string (nullable = true)
 |-- Store_Open_Date: date (nullable = true)
 |-- Stock_On_Hand: long (nullable = true)



In [13]:
from pyspark.sql.functions import regexp_replace, col
from pyspark.sql.types import DoubleType

sales = sales.withColumn("Product_Cost",regexp_replace(col("Product_Cost"),"\\$","").cast(DoubleType()))
sales = sales.withColumn("Product_Price",regexp_replace(col("Product_Price"),"\\$","").cast(DoubleType()))

In [14]:
sales.printSchema()

root
 |-- Store_ID: long (nullable = true)
 |-- Product_ID: double (nullable = true)
 |-- Sale_ID: long (nullable = true)
 |-- Date: date (nullable = true)
 |-- Units: double (nullable = true)
 |-- Product_Name: string (nullable = true)
 |-- Product_Category: string (nullable = true)
 |-- Product_Cost: double (nullable = true)
 |-- Product_Price: double (nullable = true)
 |-- Store_Name: string (nullable = true)
 |-- Store_City: string (nullable = true)
 |-- Store_Location: string (nullable = true)
 |-- Store_Open_Date: date (nullable = true)
 |-- Stock_On_Hand: long (nullable = true)



In [15]:
from pyspark.sql.functions import col
sales=sales.withColumn("Units",sales["Units"].cast("int"))

sales = sales.withColumn("Total_product_cost",col("Product_Cost")*col("Units"))\
        .withColumn("Total_product_price",col("Product_Price")*col("Units"))\
        .withColumn("Profit",col("Total_product_price")-col("Total_product_cost"))
sales.show()

+--------+----------+-------+----------+-----+-------------+----------------+------------+-------------+--------------------+----------+--------------+---------------+-------------+------------------+-------------------+------+
|Store_ID|Product_ID|Sale_ID|      Date|Units| Product_Name|Product_Category|Product_Cost|Product_Price|          Store_Name|Store_City|Store_Location|Store_Open_Date|Stock_On_Hand|Total_product_cost|Total_product_price|Profit|
+--------+----------+-------+----------+-----+-------------+----------------+------------+-------------+--------------------+----------+--------------+---------------+-------------+------------------+-------------------+------+
|      26|       8.0|   2838|2022-01-04|    1|Deck Of Cards|           Games|        3.99|         6.99|Maven Toys Campec...|  Campeche|    Commercial|     2010-09-15|            8|              3.99|               6.99|   3.0|
|      26|       8.0|   2838|2022-01-04|    1|Deck Of Cards|           Games|        3.9

In [23]:
from pyspark.sql.functions import col,sum,round
sales.groupBy("Product_Name").agg(round(sum("Profit"),2).alias("Profit"))\
                              .orderBy(col("Profit").asc()).show()

+--------------------+---------+
|        Product_Name|   Profit|
+--------------------+---------+
|                NULL|     NULL|
|    Classic Dominoes| 289910.0|
|       Uno Card Game| 343756.0|
|    Chutes & Ladders| 372531.0|
|          Teddy Bear| 377338.0|
|Supersoaker Water...| 606417.0|
|  Foam Disk Launcher| 625845.0|
|            Monopoly| 640926.0|
|     PlayDoh Toolkit| 678853.0|
|     PlayDoh Playset| 741600.0|
|            Dino Egg|1073402.0|
|      Mr. Potatohead|1237730.0|
|   Hot Wheels 5-Pack|1291860.0|
|Mini Basketball Hoop|1403008.0|
|           Toy Robot|1719735.0|
|          Plush Pony|1776181.0|
|        Splash Balls|1809767.0|
|        Rubik's Cube|2715898.0|
|               Jenga|2877511.0|
|         PlayDoh Can|2958820.0|
+--------------------+---------+
only showing top 20 rows



In [26]:
from pyspark.sql.functions import col,sum,round,date_format
sales=sales.withColumn("Month",date_format("Date","MM"))

sales.groupBy("Month","Product_Category").agg(round(sum("Total_product_price"),2).alias("Total_sales"))\
      .orderBy("Month","Product_Category").show()




+-----+-----------------+-------------+
|Month| Product_Category|  Total_sales|
+-----+-----------------+-------------+
|   01|     Art & Crafts|   7351754.18|
|   01|      Electronics|   7839395.79|
|   01|            Games|   7659510.59|
|   01|Sports & Outdoors|   5298263.17|
|   01|             Toys|1.294887649E7|
|   02|     Art & Crafts|   7048230.67|
|   02|      Electronics|   7411440.68|
|   02|            Games|   7112806.25|
|   02|Sports & Outdoors|   5108601.07|
|   02|             Toys|1.351156213E7|
|   03|     Art & Crafts|   8200054.32|
|   03|      Electronics|   7461885.55|
|   03|            Games|   7071606.06|
|   03|Sports & Outdoors|   6955300.05|
|   03|             Toys|1.711144597E7|
|   04|     Art & Crafts|   9466375.08|
|   04|      Electronics|    6915165.0|
|   04|            Games|   7139199.29|
|   04|Sports & Outdoors|   7448857.32|
|   04|             Toys|1.712230229E7|
+-----+-----------------+-------------+
only showing top 20 rows



In [30]:
from pyspark.sql.functions import col,round,sum
sales.groupBy("Store_City").agg(round(sum("Total_product_price"),2).alias("Total_sales"))\
    .show()

+----------------+-------------+
|      Store_City|  Total_sales|
+----------------+-------------+
|    Villahermosa|   8238483.75|
|         Morelia|    9012537.1|
|         Durango|   5872245.96|
|          Toluca|1.692824366E7|
|      Guanajuato|2.616149447E7|
|       Chihuahua|  1.6648093E7|
|       Monterrey|3.707310264E7|
|        Culiacan|   7859502.35|
|      Cuernavaca|   5893508.18|
|        Mexicali|1.756383226E7|
|      Hermosillo|2.787844687E7|
|Tuxtla Gutierrez|   6146869.58|
|        Campeche|1.693678735E7|
|     Guadalajara|3.698997199E7|
|    Chilpancingo|   6438659.17|
|          Oaxaca|    6379950.7|
|        Chetumal|    8389959.2|
|Cuidad de Mexico|4.898339831E7|
|  Aguascalientes|   7690044.25|
| Ciudad Victoria|   7828114.42|
+----------------+-------------+
only showing top 20 rows



In [31]:
sales.filter(sales.Store_City == "Guadalajara").groupBy("Product_Name").sum("Profit").sort(sum("Profit")).show()

+--------------------+------------------+
|        Product_Name|       sum(Profit)|
+--------------------+------------------+
|    Chutes & Ladders|            6300.0|
|            Monopoly|12599.999999999998|
|    Classic Dominoes|           13930.0|
|       Uno Card Game|           17360.0|
|Mini Basketball Hoop| 41999.99999999999|
|          Teddy Bear|           43906.0|
|Supersoaker Water...|           45912.0|
|  Foam Disk Launcher|           57738.0|
|     PlayDoh Playset|           60068.0|
|     PlayDoh Toolkit|           72318.0|
|            Dino Egg|           86456.0|
|      Mr. Potatohead|          109755.0|
|  Mini Ping Pong Set|          111852.0|
|        Splash Balls|          112134.0|
|   Hot Wheels 5-Pack|          138948.0|
|           Toy Robot|          165385.0|
|          Plush Pony|168310.99999999997|
|               Jenga|          235774.0|
|          Magic Sand|          251840.0|
|         PlayDoh Can|254407.00000000003|
+--------------------+------------

In [37]:
from pyspark.sql.functions import col,sum
sales.groupBy("Product_Name").agg(sum("Units")).alias("Total_units_sold").show()

+--------------------+----------+
|        Product_Name|sum(Units)|
+--------------------+----------+
|Supersoaker Water...|    202139|
|            Monopoly|    106821|
|            Dart Gun|    937457|
|  Mini Ping Pong Set|    999668|
|          Teddy Bear|    188669|
|           Colorbuds|   3156583|
|                NULL|         0|
|       Action Figure|   1735956|
|           Toy Robot|    343947|
|       Deck Of Cards|   2514225|
|      Mr. Potatohead|    247546|
|     PlayDoh Toolkit|    678853|
|  Foam Disk Launcher|    208615|
|    Chutes & Ladders|    124177|
|    Classic Dominoes|    144955|
|       Etch A Sketch|    341908|
|       Glass Marbles|   1145565|
|         Lego Bricks|   1761700|
|            Nerf Gun|    747637|
|     Kids Makeup Kit|    724866|
+--------------------+----------+
only showing top 20 rows



In [38]:
sales.write.csv("sales_data.csv",header = True)