In [49]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("s8a-analitica-ventas").getOrCreate()

In [50]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
esquema = StructType([
    StructField("Order ID", IntegerType(), False),
    StructField("Product", StringType(), False),
    StructField("Quantity Ordered", IntegerType(), True),
    StructField("Price Each", DoubleType(), False),
    StructField("Order Date", StringType(), False),
    StructField("Purchase Address", StringType(), False)
])

In [51]:
# Mucho más rapido leerlo con el esquema
df = spark.read.option("sep",",").option("header", "true").schema(esquema).csv("salesdata/*.csv")
    
df.printSchema()

root
 |-- Order ID: integer (nullable = true)
 |-- Product: string (nullable = true)
 |-- Quantity Ordered: integer (nullable = true)
 |-- Price Each: double (nullable = true)
 |-- Order Date: string (nullable = true)
 |-- Purchase Address: string (nullable = true)



In [52]:
df.describe().show()



+-------+------------------+------------+------------------+------------------+--------------+--------------------+
|summary|          Order ID|     Product|  Quantity Ordered|        Price Each|    Order Date|    Purchase Address|
+-------+------------------+------------+------------------+------------------+--------------+--------------------+
|  count|            185950|      186305|            185950|            185950|        186305|              186305|
|   mean| 230417.5693788653|        null|1.1243828986286637|184.39973476747707|          null|                null|
| stddev|51512.737109995265|        null|0.4427926240286704| 332.7313298843439|          null|                null|
|    min|            141234|20in Monitor|                 1|              2.99|01/01/19 03:07|1 11th St, Atlant...|
|    max|            319670|      iPhone|                 9|            1700.0|    Order Date|    Purchase Address|
+-------+------------------+------------+------------------+------------

                                                                                

In [53]:
# Limpieza de datos


# Quitar nulos

# Quitar encabezados

In [54]:
#Renombrar campos
ventas = df.withColumnRenamed("Order ID", "OrderID").withColumnRenamed("Quantity Ordered", "Quantity").withColumnRenamed("Price Each", "Price")
ventas = ventas.withColumnRenamed("Order Date", "OrderDate").withColumnRenamed("Purchase Address", "OrderAddress")

In [55]:
# Comprobamos si tenemos campos nulos
from pyspark.sql.functions import col
ventas.filter(col("OrderID").isNull()).show()

+-------+-------+--------+-----+----------+----------------+
|OrderID|Product|Quantity|Price| OrderDate|    OrderAddress|
+-------+-------+--------+-----+----------+----------------+
|   null|Product|    null| null|Order Date|Purchase Address|
|   null|   null|    null| null|      null|            null|
|   null|   null|    null| null|      null|            null|
|   null|   null|    null| null|      null|            null|
|   null|Product|    null| null|Order Date|Purchase Address|
|   null|Product|    null| null|Order Date|Purchase Address|
|   null|   null|    null| null|      null|            null|
|   null|   null|    null| null|      null|            null|
|   null|   null|    null| null|      null|            null|
|   null|   null|    null| null|      null|            null|
|   null|Product|    null| null|Order Date|Purchase Address|
|   null|   null|    null| null|      null|            null|
|   null|   null|    null| null|      null|            null|
|   null|   null|    nul

In [56]:
# Quitar nulos
ventasSinNulos = ventas.na.drop("any")
# Comprobamos
ventasSinNulos.filter(col("OrderID").isNull()).show()

+-------+-------+--------+-----+---------+------------+
|OrderID|Product|Quantity|Price|OrderDate|OrderAddress|
+-------+-------+--------+-----+---------+------------+
+-------+-------+--------+-----+---------+------------+



In [57]:
# Comprobamos las cabeceras
encabezados = ventasSinNulos.filter((col("OrderID") == "Order ID") | (col("Product") == "Product"))
encabezados.show()

+-------+-------+--------+-----+---------+------------+
|OrderID|Product|Quantity|Price|OrderDate|OrderAddress|
+-------+-------+--------+-----+---------+------------+
+-------+-------+--------+-----+---------+------------+





In [58]:
# Obtener la ciudad y el estado
ventasSinNulos.select("OrderAddress").show(10,False)

+-----------------------------------------+
|OrderAddress                             |
+-----------------------------------------+
|136 Church St, New York City, NY 10001   |
|562 2nd St, New York City, NY 10001      |
|277 Main St, New York City, NY 10001     |
|410 6th St, San Francisco, CA 94016      |
|43 Hill St, Atlanta, GA 30301            |
|200 Jefferson St, New York City, NY 10001|
|928 12th St, Portland, OR 97035          |
|813 Hickory St, Dallas, TX 75001         |
|718 Wilson St, Dallas, TX 75001          |
|77 7th St, Dallas, TX 75001              |
+-----------------------------------------+
only showing top 10 rows



In [59]:
# Si separamos por comas, la ciudad es la de en medio, y el estado las dos letras mayúculas de la tercera
from pyspark.sql.functions import split, trim
ventasCiudad = ventasSinNulos.withColumn("City", trim(split(col("OrderAddress"), ",")[1]))
ventasEstado = ventasCiudad.withColumn("State", trim(split(split(col("OrderAddress"), ",")[2], " ")[1]))
ventasEstado.show()

+-------+--------------------+--------+------+--------------+--------------------+-------------+-----+
|OrderID|             Product|Quantity| Price|     OrderDate|        OrderAddress|         City|State|
+-------+--------------------+--------+------+--------------+--------------------+-------------+-----+
| 295665|  Macbook Pro Laptop|       1|1700.0|12/30/19 00:01|136 Church St, Ne...|New York City|   NY|
| 295666|  LG Washing Machine|       1| 600.0|12/29/19 07:03|562 2nd St, New Y...|New York City|   NY|
| 295667|USB-C Charging Cable|       1| 11.95|12/12/19 18:21|277 Main St, New ...|New York City|   NY|
| 295668|    27in FHD Monitor|       1|149.99|12/22/19 15:13|410 6th St, San F...|San Francisco|   CA|
| 295669|USB-C Charging Cable|       1| 11.95|12/18/19 12:38|43 Hill St, Atlan...|      Atlanta|   GA|
| 295670|AA Batteries (4-p...|       1|  3.84|12/31/19 22:58|200 Jefferson St,...|New York City|   NY|
| 295671|USB-C Charging Cable|       1| 11.95|12/16/19 15:10|928 12th St,

In [61]:
# Cambiamos el campo fecha
from pyspark.sql.functions import to_timestamp
ventas = ventasEstado.withColumn("OrderDate", to_timestamp(ventasEstado.OrderDate, "MM/dd/yy HH:mm"))
ventas.show()

+-------+--------------------+--------+------+-------------------+--------------------+-------------+-----+
|OrderID|             Product|Quantity| Price|          OrderDate|        OrderAddress|         City|State|
+-------+--------------------+--------+------+-------------------+--------------------+-------------+-----+
| 295665|  Macbook Pro Laptop|       1|1700.0|2019-12-30 00:01:00|136 Church St, Ne...|New York City|   NY|
| 295666|  LG Washing Machine|       1| 600.0|2019-12-29 07:03:00|562 2nd St, New Y...|New York City|   NY|
| 295667|USB-C Charging Cable|       1| 11.95|2019-12-12 18:21:00|277 Main St, New ...|New York City|   NY|
| 295668|    27in FHD Monitor|       1|149.99|2019-12-22 15:13:00|410 6th St, San F...|San Francisco|   CA|
| 295669|USB-C Charging Cable|       1| 11.95|2019-12-18 12:38:00|43 Hill St, Atlan...|      Atlanta|   GA|
| 295670|AA Batteries (4-p...|       1|  3.84|2019-12-31 22:58:00|200 Jefferson St,...|New York City|   NY|
| 295671|USB-C Charging Cabl

In [63]:
# Añadimos columnas Year y Month
from pyspark.sql.functions import year, month
ventas = ventas.withColumn("Year", year("OrderDate")).withColumn("Month", month("OrderDate"))
ventas.show()

+-------+--------------------+--------+------+-------------------+--------------------+-------------+-----+----+-----+
|OrderID|             Product|Quantity| Price|          OrderDate|        OrderAddress|         City|State|Year|Month|
+-------+--------------------+--------+------+-------------------+--------------------+-------------+-----+----+-----+
| 295665|  Macbook Pro Laptop|       1|1700.0|2019-12-30 00:01:00|136 Church St, Ne...|New York City|   NY|2019|   12|
| 295666|  LG Washing Machine|       1| 600.0|2019-12-29 07:03:00|562 2nd St, New Y...|New York City|   NY|2019|   12|
| 295667|USB-C Charging Cable|       1| 11.95|2019-12-12 18:21:00|277 Main St, New ...|New York City|   NY|2019|   12|
| 295668|    27in FHD Monitor|       1|149.99|2019-12-22 15:13:00|410 6th St, San F...|San Francisco|   CA|2019|   12|
| 295669|USB-C Charging Cable|       1| 11.95|2019-12-18 12:38:00|43 Hill St, Atlan...|      Atlanta|   GA|2019|   12|
| 295670|AA Batteries (4-p...|       1|  3.84|20

In [64]:
# Escribir en parquet con los datos particionados
ventas.write.mode("overwrite").partitionBy("Year", "Month").parquet("./salesdataoutput/")

                                                                                

In [65]:
# Leemos solo los datos de 2019. Notad que no ha leído el campo Year
ventas2019 = spark.read.parquet("./salesdataoutput/Year=2019")
ventas2019.printSchema()

root
 |-- OrderID: integer (nullable = true)
 |-- Product: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- Price: double (nullable = true)
 |-- OrderDate: timestamp (nullable = true)
 |-- OrderAddress: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Month: integer (nullable = true)



22/04/02 10:51:59 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 51755933 ms exceeds timeout 120000 ms
22/04/02 10:52:00 WARN SparkContext: Killing executors is not supported by current scheduler.
