In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType

In [2]:
spark = (SparkSession.builder.appName("SalesAnalytics").getOrCreate())

# **Preparacion de los datos**

In [3]:
schema = StructType([
    StructField("Order ID", StringType(), True),
    StructField("Product", StringType(), True),
    StructField("Quantity Ordered", StringType(), True),
    StructField("Price Each", StringType(), True),
    StructField("Order Date", StringType(), True),
    StructField("Purchase Adress", StringType(), True),
])

In [4]:
sales_data_fpath = "./salesdata"
sales_raw_df = (
spark.read.format("csv")
       .option("header", True)
       .schema(schema)
       .load(sales_data_fpath)
)

In [5]:
sales_raw_df.show(10)

+--------+--------------------+----------------+----------+--------------+--------------------+
|Order ID|             Product|Quantity Ordered|Price Each|    Order Date|     Purchase Adress|
+--------+--------------------+----------------+----------+--------------+--------------------+
|  295665|  Macbook Pro Laptop|               1|      1700|12/30/19 00:01|136 Church St, Ne...|
|  295666|  LG Washing Machine|               1|     600.0|12/29/19 07:03|562 2nd St, New Y...|
|  295667|USB-C Charging Cable|               1|     11.95|12/12/19 18:21|277 Main St, New ...|
|  295668|    27in FHD Monitor|               1|    149.99|12/22/19 15:13|410 6th St, San F...|
|  295669|USB-C Charging Cable|               1|     11.95|12/18/19 12:38|43 Hill St, Atlan...|
|  295670|AA Batteries (4-p...|               1|      3.84|12/31/19 22:58|200 Jefferson St,...|
|  295671|USB-C Charging Cable|               1|     11.95|12/16/19 15:10|928 12th St, Port...|
|  295672|USB-C Charging Cable|         

In [6]:
sales_raw_df.printSchema()

root
 |-- Order ID: string (nullable = true)
 |-- Product: string (nullable = true)
 |-- Quantity Ordered: string (nullable = true)
 |-- Price Each: string (nullable = true)
 |-- Order Date: string (nullable = true)
 |-- Purchase Adress: string (nullable = true)



## **Limpeza de los datos**

- Imperfeccciones de la data y registros null

In [7]:
from pyspark.sql.functions import col

In [8]:
sales_raw_df.filter(col("Order ID").isNull() == True).show(10)

+--------+-------+----------------+----------+----------+---------------+
|Order ID|Product|Quantity Ordered|Price Each|Order Date|Purchase Adress|
+--------+-------+----------------+----------+----------+---------------+
|    NULL|   NULL|            NULL|      NULL|      NULL|           NULL|
|    NULL|   NULL|            NULL|      NULL|      NULL|           NULL|
|    NULL|   NULL|            NULL|      NULL|      NULL|           NULL|
|    NULL|   NULL|            NULL|      NULL|      NULL|           NULL|
|    NULL|   NULL|            NULL|      NULL|      NULL|           NULL|
|    NULL|   NULL|            NULL|      NULL|      NULL|           NULL|
|    NULL|   NULL|            NULL|      NULL|      NULL|           NULL|
|    NULL|   NULL|            NULL|      NULL|      NULL|           NULL|
|    NULL|   NULL|            NULL|      NULL|      NULL|           NULL|
|    NULL|   NULL|            NULL|      NULL|      NULL|           NULL|
+--------+-------+----------------+---

In [9]:
sales_raw_df = sales_raw_df.na.drop("any")

In [10]:
sales_raw_df.filter(col("Order ID").isNull() == True).show(10)

+--------+-------+----------------+----------+----------+---------------+
|Order ID|Product|Quantity Ordered|Price Each|Order Date|Purchase Adress|
+--------+-------+----------------+----------+----------+---------------+
+--------+-------+----------------+----------+----------+---------------+



In [10]:
sales_raw_df.describe("Quantity Ordered", "Price Each").show()

+-------+------------------+-----------------+
|summary|  Quantity Ordered|       Price Each|
+-------+------------------+-----------------+
|  count|            186305|           186305|
|   mean|1.1243828986286637|184.3997347674861|
| stddev|0.4427926240286694|332.7313298843445|
|    min|                 1|           109.99|
|    max|  Quantity Ordered|       Price Each|
+-------+------------------+-----------------+



In [11]:
sales_raw_df.filter(col("Order ID") == "Order ID").show(10)

+--------+-------+----------------+----------+----------+----------------+
|Order ID|Product|Quantity Ordered|Price Each|Order Date| Purchase Adress|
+--------+-------+----------------+----------+----------+----------------+
|Order ID|Product|Quantity Ordered|Price Each|Order Date|Purchase Address|
|Order ID|Product|Quantity Ordered|Price Each|Order Date|Purchase Address|
|Order ID|Product|Quantity Ordered|Price Each|Order Date|Purchase Address|
|Order ID|Product|Quantity Ordered|Price Each|Order Date|Purchase Address|
|Order ID|Product|Quantity Ordered|Price Each|Order Date|Purchase Address|
|Order ID|Product|Quantity Ordered|Price Each|Order Date|Purchase Address|
|Order ID|Product|Quantity Ordered|Price Each|Order Date|Purchase Address|
|Order ID|Product|Quantity Ordered|Price Each|Order Date|Purchase Address|
|Order ID|Product|Quantity Ordered|Price Each|Order Date|Purchase Address|
|Order ID|Product|Quantity Ordered|Price Each|Order Date|Purchase Address|
+--------+-------+-------

In [12]:
sales_temp_df = sales_raw_df.distinct()

In [13]:
sales_temp_df.show(5)

+--------+--------------------+----------------+----------+--------------+--------------------+
|Order ID|             Product|Quantity Ordered|Price Each|    Order Date|     Purchase Adress|
+--------+--------------------+----------------+----------+--------------+--------------------+
|  295710|AAA Batteries (4-...|               1|      2.99|12/26/19 19:09|729 Pine St, Atla...|
|  295749|    Wired Headphones|               2|     11.99|12/04/19 05:24|983 Highland St, ...|
|  296168|     ThinkPad Laptop|               1|    999.99|12/29/19 07:26|810 Lincoln St, B...|
|  296698|    Wired Headphones|               1|     11.99|12/13/19 22:02|676 13th St, Atla...|
|  297024|  Macbook Pro Laptop|               1|      1700|12/15/19 15:16|869 9th St, San F...|
+--------+--------------------+----------------+----------+--------------+--------------------+
only showing top 5 rows



In [14]:
sales_temp_df.filter(col("Order ID") == "Order ID").show()

+--------+-------+----------------+----------+----------+----------------+
|Order ID|Product|Quantity Ordered|Price Each|Order Date| Purchase Adress|
+--------+-------+----------------+----------+----------+----------------+
|Order ID|Product|Quantity Ordered|Price Each|Order Date|Purchase Address|
+--------+-------+----------------+----------+----------+----------------+



## **Probar la solución al porblema**

In [15]:
sales_temp_df = sales_temp_df.filter(col("Order ID") != "Order ID")

In [16]:
sales_temp_df.filter(col("Order ID") == "Order ID").show(10)

+--------+-------+----------------+----------+----------+---------------+
|Order ID|Product|Quantity Ordered|Price Each|Order Date|Purchase Adress|
+--------+-------+----------------+----------+----------+---------------+
+--------+-------+----------------+----------+----------+---------------+



In [17]:
sales_temp_df.show(10, truncate=False)

+--------+------------------------+----------------+----------+--------------+--------------------------------------+
|Order ID|Product                 |Quantity Ordered|Price Each|Order Date    |Purchase Adress                       |
+--------+------------------------+----------------+----------+--------------+--------------------------------------+
|295710  |AAA Batteries (4-pack)  |1               |2.99      |12/26/19 19:09|729 Pine St, Atlanta, GA 30301        |
|295749  |Wired Headphones        |2               |11.99     |12/04/19 05:24|983 Highland St, Los Angeles, CA 90001|
|296168  |ThinkPad Laptop         |1               |999.99    |12/29/19 07:26|810 Lincoln St, Boston, MA 02215      |
|296698  |Wired Headphones        |1               |11.99     |12/13/19 22:02|676 13th St, Atlanta, GA 30301        |
|297024  |Macbook Pro Laptop      |1               |1700      |12/15/19 15:16|869 9th St, San Francisco, CA 94016   |
|297128  |ThinkPad Laptop         |1               |999.

In [18]:
sales_temp_df.describe().show()

+-------+------------------+------------+------------------+------------------+--------------+--------------------+
|summary|          Order ID|     Product|  Quantity Ordered|        Price Each|    Order Date|     Purchase Adress|
+-------+------------------+------------+------------------+------------------+--------------+--------------------+
|  count|            185686|      185686|            185686|            185686|        185686|              185686|
|   mean|230411.37622653297|        NULL|1.1245435843305365|184.51925546358373|          NULL|                NULL|
| stddev| 51511.71718332089|        NULL|0.4430687383832877|332.84383839005267|          NULL|                NULL|
|    min|            141234|20in Monitor|                 1|            109.99|01/01/19 03:07|1 11th St, Atlant...|
|    max|            319670|      iPhone|                 9|            999.99|12/31/19 23:53|999 Wilson St, Sa...|
+-------+------------------+------------+------------------+------------

# **Transformaciones**

## **Extraer la ciudad con la dirección**

In [19]:
from pyspark.sql.functions import split

In [20]:
sales_temp_df.select("Purchase Adress").show(10, False)

+--------------------------------------+
|Purchase Adress                       |
+--------------------------------------+
|729 Pine St, Atlanta, GA 30301        |
|983 Highland St, Los Angeles, CA 90001|
|810 Lincoln St, Boston, MA 02215      |
|676 13th St, Atlanta, GA 30301        |
|869 9th St, San Francisco, CA 94016   |
|90 14th St, Los Angeles, CA 90001     |
|357 13th St, Atlanta, GA 30301        |
|949 Adams St, Atlanta, GA 30301       |
|702 Wilson St, New York City, NY 10001|
|151 14th St, Boston, MA 02215         |
+--------------------------------------+
only showing top 10 rows



In [21]:
sales_temp_df.select("Purchase Adress", split(col("Purchase Adress"), ",")).show(10, False)

+--------------------------------------+------------------------------------------+
|Purchase Adress                       |split(Purchase Adress, ,, -1)             |
+--------------------------------------+------------------------------------------+
|729 Pine St, Atlanta, GA 30301        |[729 Pine St,  Atlanta,  GA 30301]        |
|983 Highland St, Los Angeles, CA 90001|[983 Highland St,  Los Angeles,  CA 90001]|
|810 Lincoln St, Boston, MA 02215      |[810 Lincoln St,  Boston,  MA 02215]      |
|676 13th St, Atlanta, GA 30301        |[676 13th St,  Atlanta,  GA 30301]        |
|869 9th St, San Francisco, CA 94016   |[869 9th St,  San Francisco,  CA 94016]   |
|90 14th St, Los Angeles, CA 90001     |[90 14th St,  Los Angeles,  CA 90001]     |
|357 13th St, Atlanta, GA 30301        |[357 13th St,  Atlanta,  GA 30301]        |
|949 Adams St, Atlanta, GA 30301       |[949 Adams St,  Atlanta,  GA 30301]       |
|702 Wilson St, New York City, NY 10001|[702 Wilson St,  New York City,  NY 

In [22]:
sales_temp_df.select("Purchase Adress", split(col("Purchase Adress"), ",").getItem(1)).show(10, False)

+--------------------------------------+--------------------------------+
|Purchase Adress                       |split(Purchase Adress, ,, -1)[1]|
+--------------------------------------+--------------------------------+
|729 Pine St, Atlanta, GA 30301        | Atlanta                        |
|983 Highland St, Los Angeles, CA 90001| Los Angeles                    |
|810 Lincoln St, Boston, MA 02215      | Boston                         |
|676 13th St, Atlanta, GA 30301        | Atlanta                        |
|869 9th St, San Francisco, CA 94016   | San Francisco                  |
|90 14th St, Los Angeles, CA 90001     | Los Angeles                    |
|357 13th St, Atlanta, GA 30301        | Atlanta                        |
|949 Adams St, Atlanta, GA 30301       | Atlanta                        |
|702 Wilson St, New York City, NY 10001| New York City                  |
|151 14th St, Boston, MA 02215         | Boston                         |
+-------------------------------------

In [23]:
sales_temp_df.select("Purchase Adress", split(col("Purchase Adress"), ",").getItem(2)).show(10, False)

+--------------------------------------+--------------------------------+
|Purchase Adress                       |split(Purchase Adress, ,, -1)[2]|
+--------------------------------------+--------------------------------+
|729 Pine St, Atlanta, GA 30301        | GA 30301                       |
|983 Highland St, Los Angeles, CA 90001| CA 90001                       |
|810 Lincoln St, Boston, MA 02215      | MA 02215                       |
|676 13th St, Atlanta, GA 30301        | GA 30301                       |
|869 9th St, San Francisco, CA 94016   | CA 94016                       |
|90 14th St, Los Angeles, CA 90001     | CA 90001                       |
|357 13th St, Atlanta, GA 30301        | GA 30301                       |
|949 Adams St, Atlanta, GA 30301       | GA 30301                       |
|702 Wilson St, New York City, NY 10001| NY 10001                       |
|151 14th St, Boston, MA 02215         | MA 02215                       |
+-------------------------------------

In [24]:
sales_temp_df = (sales_temp_df.withColumn("City", split(col("Purchase Adress"), ",").getItem(1))
                              .withColumn("State", split(split(col("Purchase Adress"), ",").getItem(2),' ').getItem(1)))

In [25]:
sales_temp_df.show(10, False)

+--------+------------------------+----------------+----------+--------------+--------------------------------------+--------------+-----+
|Order ID|Product                 |Quantity Ordered|Price Each|Order Date    |Purchase Adress                       |City          |State|
+--------+------------------------+----------------+----------+--------------+--------------------------------------+--------------+-----+
|295710  |AAA Batteries (4-pack)  |1               |2.99      |12/26/19 19:09|729 Pine St, Atlanta, GA 30301        | Atlanta      |GA   |
|295749  |Wired Headphones        |2               |11.99     |12/04/19 05:24|983 Highland St, Los Angeles, CA 90001| Los Angeles  |CA   |
|296168  |ThinkPad Laptop         |1               |999.99    |12/29/19 07:26|810 Lincoln St, Boston, MA 02215      | Boston       |MA   |
|296698  |Wired Headphones        |1               |11.99     |12/13/19 22:02|676 13th St, Atlanta, GA 30301        | Atlanta      |GA   |
|297024  |Macbook Pro Lapto

## **Renombrar y cambiar tipos**

In [26]:
from pyspark.sql.functions import to_timestamp
from pyspark.sql.types import IntegerType, FloatType

In [27]:
sales_temp_df = (sales_temp_df.withColumn("OrderId", col("Order ID").cast(IntegerType()))
                              .withColumn("Quantity", col("Quantity Ordered").cast(IntegerType()))
                              .withColumn("Price", col("Price Each").cast(FloatType()))
                              .withColumn("OrderDate", to_timestamp(col("Order date"), "MM/dd/yy HH:mm"))
                              .withColumnRenamed("Purchase Adress", "StoreAdress")
                              .drop("Order Id")
                              .drop("Quantity Ordered")
                              .drop("Price Each")
                              .drop("Purchase Adress"))

In [28]:
sales_temp_df.show(10, False)

+------------------------+--------------+--------------------------------------+--------------+-----+-------+--------+------+-------------------+
|Product                 |Order Date    |StoreAdress                           |City          |State|OrderId|Quantity|Price |OrderDate          |
+------------------------+--------------+--------------------------------------+--------------+-----+-------+--------+------+-------------------+
|AAA Batteries (4-pack)  |12/26/19 19:09|729 Pine St, Atlanta, GA 30301        | Atlanta      |GA   |295710 |1       |2.99  |2019-12-26 19:09:00|
|Wired Headphones        |12/04/19 05:24|983 Highland St, Los Angeles, CA 90001| Los Angeles  |CA   |295749 |2       |11.99 |2019-12-04 05:24:00|
|ThinkPad Laptop         |12/29/19 07:26|810 Lincoln St, Boston, MA 02215      | Boston       |MA   |296168 |1       |999.99|2019-12-29 07:26:00|
|Wired Headphones        |12/13/19 22:02|676 13th St, Atlanta, GA 30301        | Atlanta      |GA   |296698 |1       |11.99 

In [29]:
sales_temp_df.printSchema()

root
 |-- Product: string (nullable = true)
 |-- Order Date: string (nullable = true)
 |-- StoreAdress: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- OrderId: integer (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- Price: float (nullable = true)
 |-- OrderDate: timestamp (nullable = true)



## **Agragar nuevas columnas: mes y año**

In [30]:
from pyspark.sql.functions import year,month

In [35]:
sales_temp_df = (sales_temp_df.withColumn("ReportYear", year(col("OrderDate")))
                              .withColumn("Month", month(col("OrderDate"))))

In [36]:
sales_temp_df.show(10)

+--------------------+--------------+--------------------+--------------+-----+-------+--------+------+-------------------+----------+----+-----+
|             Product|    Order Date|         StoreAdress|          City|State|OrderId|Quantity| Price|          OrderDate|ReportYear|Mont|Month|
+--------------------+--------------+--------------------+--------------+-----+-------+--------+------+-------------------+----------+----+-----+
|AAA Batteries (4-...|12/26/19 19:09|729 Pine St, Atla...|       Atlanta|   GA| 295710|       1|  2.99|2019-12-26 19:09:00|      2019|  12|   12|
|    Wired Headphones|12/04/19 05:24|983 Highland St, ...|   Los Angeles|   CA| 295749|       2| 11.99|2019-12-04 05:24:00|      2019|  12|   12|
|     ThinkPad Laptop|12/29/19 07:26|810 Lincoln St, B...|        Boston|   MA| 296168|       1|999.99|2019-12-29 07:26:00|      2019|  12|   12|
|    Wired Headphones|12/13/19 22:02|676 13th St, Atla...|       Atlanta|   GA| 296698|       1| 11.99|2019-12-13 22:02:00| 

## **Escribir el dataFrame en un parquet**

In [37]:
sales_final_df = sales_temp_df.select("OrderId", "Product", "Quantity", "Price", "OrderDate", "StoreAdress", "City", "Reportyear", "Month")

In [38]:
sales_final_df.show(10)

+-------+--------------------+--------+------+-------------------+--------------------+--------------+----------+-----+
|OrderId|             Product|Quantity| Price|          OrderDate|         StoreAdress|          City|Reportyear|Month|
+-------+--------------------+--------+------+-------------------+--------------------+--------------+----------+-----+
| 295710|AAA Batteries (4-...|       1|  2.99|2019-12-26 19:09:00|729 Pine St, Atla...|       Atlanta|      2019|   12|
| 295749|    Wired Headphones|       2| 11.99|2019-12-04 05:24:00|983 Highland St, ...|   Los Angeles|      2019|   12|
| 296168|     ThinkPad Laptop|       1|999.99|2019-12-29 07:26:00|810 Lincoln St, B...|        Boston|      2019|   12|
| 296698|    Wired Headphones|       1| 11.99|2019-12-13 22:02:00|676 13th St, Atla...|       Atlanta|      2019|   12|
| 297024|  Macbook Pro Laptop|       1|1700.0|2019-12-15 15:16:00|869 9th St, San F...| San Francisco|      2019|   12|
| 297128|     ThinkPad Laptop|       1|9

In [39]:
output_path = './data/output/sales'
sales_final_df.write.mode("overwrite").partitionBy("ReportYear", "Month").parquet(output_path)