In [5]:
# install java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# install spark (change the version number if needed)
!wget -q https://archive.apache.org/dist/spark/spark-3.5.1/spark-3.5.1-bin-hadoop3.tgz

# unzip the spark file to the current folder
!tar xf spark-3.5.1-bin-hadoop3.tgz

# set your spark folder to your system path environment
import os

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.1-bin-hadoop3"

# install findspark using pip
!pip install -q findspark

# install pyspark
import findspark
findspark.init()
from pyspark.sql import SparkSession

 #Acess to google drive
from google.colab import drive
drive.mount('/content/drive')
import os
os.chdir('/content/drive/My Drive/Colab Notebooks')
os.listdir()

Mounted at /content/drive


['S07L02-Gustvao.ipynb',
 'Untitled0.ipynb',
 'S12L01.ipynb',
 'S12L02.ipynb',
 'Machine_Learning_Lab03_Action_Learning_Notebook',
 'S12L03.ipynb',
 'S12L04.ipynb',
 'Untitled1.ipynb',
 'Cópia de Spark Setup and Basic Data Processing in PySpark.ipynb',
 'Spark Setup and Basic Data Processing in PySpark.ipynb',
 'DATA',
 'Untitled2.ipynb']

In [6]:
spark = SparkSession.builder.appName("Análise de Ventas de Zapatos").master("local[*]").getOrCreate()

Explanation:

*   This cell initializes a Spark session named "Análise de Ventas de Zapatos".
*   master("local[*]") specifies that Spark should run locally using all available cores.



In [10]:
df = spark.read.csv('/content/drive/My Drive/Colab Notebooks/DATA/ventas_zapatos.csv', header=True, inferSchema=True, sep=';')
df.printSchema()

root
 |-- Fecha: string (nullable = true)
 |-- id_cliente: string (nullable = true)
 |-- nombre: string (nullable = true)
 |-- apellido_1: string (nullable = true)
 |-- apellido_2: string (nullable = true)
 |-- id_prod: integer (nullable = true)
 |-- nombre_prod: string (nullable = true)
 |-- material_prod: string (nullable = true)
 |-- categoria_prod: string (nullable = true)
 |-- precio: integer (nullable = true)



Explanation:

*   A CSV file named ventas_zapatos.csv is read from Google Drive into a DataFrame df.
*   header=True specifies that the CSV file has a header row.
*   inferSchema=True infers the data types of the columns.
*   sep=';' specifies that the CSV uses semicolons as separators.
*   printSchema() displays the schema of the DataFrame.






In [11]:
df.createOrReplaceTempView("ventas")

Explanation:

*   This creates a temporary view called "ventas" from the DataFrame df, allowing SQL queries to be run against it.




In [12]:
spark.sql("SELECT * FROM VENTAS").show()

+----------------+---------------+----------+----------+----------+-------+--------------------+-------------+----------------+------+
|           Fecha|     id_cliente|    nombre|apellido_1|apellido_2|id_prod|         nombre_prod|material_prod|  categoria_prod|precio|
+----------------+---------------+----------+----------+----------+-------+--------------------+-------------+----------------+------+
| 20/04/2019 4:19|995052178892353|   Fabiola|    Méndez|     Ramos| 562972|   Pelusa mercenario|       Gamuza|      Zapatillas|    75|
| 10/02/2019 3:32|528848914440944|   Basileo|    Alonso|   Esteban| 949966| Reforma capitalista|         Goma|Zapatos de tacón|    50|
|08/05/2019 19:11| 53146869174343|    Míriam|  Martínez|   Esteban| 432964|    Zoca coxofemoral|       Gamuza|Zapato de vestir|    70|
|22/04/2019 15:24| 95327509355920|    Teresa|   Garrido|    Castro| 842352|Número Primo cham...|         Goma|         Botines|    50|
|02/01/2019 10:01|560935930327708|   Natalia|    Vargas

In [15]:
from pyspark.sql.functions import to_timestamp
df = df.withColumn("fecha", to_timestamp(df["fecha"], "dd/MM/yyyy H:mm"))
df.createOrReplaceTempView("ventas")
spark.sql("SELECT * FROM VENTAS LIMIT 10").show()
df.printSchema()

+-------------------+---------------+---------+----------+----------+-------+--------------------+-------------+----------------+------+
|              fecha|     id_cliente|   nombre|apellido_1|apellido_2|id_prod|         nombre_prod|material_prod|  categoria_prod|precio|
+-------------------+---------------+---------+----------+----------+-------+--------------------+-------------+----------------+------+
|2019-04-20 04:19:00|995052178892353|  Fabiola|    Méndez|     Ramos| 562972|   Pelusa mercenario|       Gamuza|      Zapatillas|    75|
|2019-02-10 03:32:00|528848914440944|  Basileo|    Alonso|   Esteban| 949966| Reforma capitalista|         Goma|Zapatos de tacón|    50|
|2019-05-08 19:11:00| 53146869174343|   Míriam|  Martínez|   Esteban| 432964|    Zoca coxofemoral|       Gamuza|Zapato de vestir|    70|
|2019-04-22 15:24:00| 95327509355920|   Teresa|   Garrido|    Castro| 842352|Número Primo cham...|         Goma|         Botines|    50|
|2019-01-02 10:01:00|560935930327708|  Na

Explanation:

*   The to_timestamp function is imported.
*   The "fecha" column is converted to a timestamp format.
*   The temporary view "ventas" is recreated with the updated DataFrame.
*   A SQL query selects the first 10 rows from the "ventas" view and displays them.
*   printSchema() shows the updated schema of the DataFrame.





In [27]:
#Crear un DataMart
categoria_prod = spark.sql("SELECT DISTINCT material_prod FROM VENTAS")
materiales.createOrReplaceTempView("materiales")
spark.sql("SELECT row_number () over (order by material_prod) as id, material_prod as nombre from materiales").createOrReplaceTempView("materiales")
spark.sql("SELECT * FROM MATERIALES").show()

+---+----------+
| id|    nombre|
+---+----------+
|  1|     Cuero|
|  2|    Gamuza|
|  3|      Goma|
|  4|Sintéticos|
|  5|      Tela|
+---+----------+



Explanation:

*   A SQL query selects distinct material_prod values from the "ventas" view, creating a DataFrame categoria_prod.
*   A temporary view "materiales" is created from categoria_prod.
*   The temporary view "ventas" is recreated with the updated DataFrame.
*   A new SQL query assigns a unique ID to each material_prod and renames it to nombre, updating the "materiales" view.
*   The contents of the "materiales" view are displayed.


In [36]:
categoria = spark.sql("SELECT DISTINCT categoria_prod FROM VENTAS")
categoria.createOrReplaceTempView("categoria")
spark.sql("SELECT row_number () over (order by categoria_prod) as id, categoria_prod as nombre from categoria").createOrReplaceTempView("categoria")
spark.sql("SELECT * FROM CATEGORIA").show()

+---+----------------+
| id|          nombre|
+---+----------------+
|  1|         Botines|
|  2|      Zapatillas|
|  3|Zapato de vestir|
|  4| Zapatos de agua|
|  5|Zapatos de tacón|
+---+----------------+



Explanation:

*   A SQL query selects distinct categoria_prod values from the "ventas" view, creating a DataFrame categoria.
*   A temporary view "categoria" is created from categoria.
*   A new SQL query assigns a unique ID to each categoria_prod and renames it to nombre, updating the "categoria" view.
*   The contents of the "categoria" view are displayed.

In [46]:
productos = spark.sql ("""
    select distinct id_prod as id,  nombre_prod as nombre, m.id as id_material, c.id as id_categoria, precio
    from ventas
    join materiales m on m.nombre = ventas.material_prod
    join categoria c on c.nombre = ventas.categoria_prod
    order by id
""")
productos.createOrReplaceTempView("productos")
productos.show()

+------+--------------------+-----------+------------+------+
|    id|              nombre|id_material|id_categoria|precio|
+------+--------------------+-----------+------------+------+
|  2809| Ablación viñamarino|          5|           1|    40|
|  4077|      Tribu agonista|          5|           4|    45|
|  6189|    Bóiler ciudadana|          2|           5|    65|
|  9171|Cuatrillón partur...|          2|           2|    75|
| 10902|  Xacena tetrasílabo|          3|           1|    45|
| 29075|  Repentista chicoco|          1|           5|    85|
| 34388|     Argucia diestro|          2|           2|    65|
| 36922|    Criancia íntegro|          5|           5|    40|
| 39377|    Complice acetoso|          5|           5|    45|
| 52175|  Carameleo leguleyo|          1|           2|    90|
| 58759|     Horca dilatable|          2|           4|    65|
| 70231|  Bizantino parlante|          5|           5|    35|
| 71840|Materia Inorgánic...|          2|           4|    65|
| 75896|

In [59]:
ventas_final = spark.sql("""
          SELECT date_part ('year', Fecha) as anyo, date_part('months', Fecha) as mes, id_prod, sum(precio) as importe
          from ventas
          group by anyo, mes, id_prod
          order by anyo, mes
          """)
ventas_final.createOrReplaceTempView("ventas_final")
ventas_final.show()

+----+---+-------+-------+
|anyo|mes|id_prod|importe|
+----+---+-------+-------+
|2018| 12| 953060|     30|
|2019|  1|  86091|  11180|
|2019|  1| 977219|   6840|
|2019|  1| 104454|   5460|
|2019|  1| 308114|  11025|
|2019|  1| 844797|  10650|
|2019|  1| 619589|  14915|
|2019|  1| 612297|  13410|
|2019|  1|   6189|  10465|
|2019|  1| 753623|   7875|
|2019|  1| 432964|  11200|
|2019|  1| 904052|   5560|
|2019|  1| 144628|   5635|
|2019|  1| 883449|  11700|
|2019|  1| 851349|   5460|
|2019|  1| 371293|   5775|
|2019|  1| 320893|   3650|
|2019|  1| 356608|  12750|
|2019|  1|  34388|   9620|
|2019|  1| 414511|   6880|
+----+---+-------+-------+
only showing top 20 rows



In [51]:
spark.sql("SELECT date_part ('year', Fecha) as anyo, date_part('months', Fecha) as mes from ventas order by fecha").show()

+----+---+
|anyo|mes|
+----+---+
|2018| 12|
|2019|  1|
|2019|  1|
|2019|  1|
|2019|  1|
|2019|  1|
|2019|  1|
|2019|  1|
|2019|  1|
|2019|  1|
|2019|  1|
|2019|  1|
|2019|  1|
|2019|  1|
|2019|  1|
|2019|  1|
|2019|  1|
|2019|  1|
|2019|  1|
|2019|  1|
+----+---+
only showing top 20 rows



In [60]:
productos_final = spark.sql ("select * from productos")
materiales_final = spark.sql ("select * from materiales")
categoria_final = spark.sql ("select * from categoria")


In [62]:
ventas_final.repartition(1). write.format("com.databricks.spark.csv").option("header", "true").option("sep", ";").mode("overwrite").save("/content/drive/My Drive/Colab Notebooks/DATA/datamart_ventas.csv")
productos_final.repartition(1). write.format("com.databricks.spark.csv").option("header", "true").option("sep", ";").mode("overwrite").save("/content/drive/My Drive/Colab Notebooks/DATA/datamart_productos.csv")
materiales_final.repartition(1). write.format("com.databricks.spark.csv").option("header", "true").option("sep", ";").mode("overwrite").save("/content/drive/My Drive/Colab Notebooks/DATA/datamart_materiales.csv")
categoria_final.repartition(1). write.format("com.databricks.spark.csv").option("header", "true").option("sep", ";").mode("overwrite").save("/content/drive/My Drive/Colab Notebooks/DATA/datamart_categoria.csv")

In [77]:
#ventas
spark.read.csv('/content/drive/My Drive/Colab Notebooks/DATA/datamart_ventas.csv', header=True, inferSchema=True, sep=';').createOrReplaceTempView("datamart_ventas")
spark.sql("SELECT * FROM datamart_ventas order by id_prod  limit 5 ").show()
#productos
spark.read.csv('/content/drive/My Drive/Colab Notebooks/DATA/datamart_productos.csv', header=True, inferSchema=True, sep=';').createOrReplaceTempView("datamart_productos")
spark.sql("SELECT * FROM datamart_productos order by id limit 5 ").show()
#materiales
spark.read.csv('/content/drive/My Drive/Colab Notebooks/DATA/datamart_materiales.csv', header=True, inferSchema=True, sep=';').createOrReplaceTempView("datamart_materiales")
spark.sql("SELECT * FROM datamart_materiales limit 5").show()
#categoria
spark.read.csv('/content/drive/My Drive/Colab Notebooks/DATA/datamart_categoria.csv', header=True, inferSchema=True, sep=';').createOrReplaceTempView("datamart_categoria")
spark.sql("SELECT * FROM datamart_categoria limit 5").show()

+----+---+-------+-------+
|anyo|mes|id_prod|importe|
+----+---+-------+-------+
|2019|  4|   2809|   6400|
|2019|  8|   2809|   6480|
|2019|  5|   2809|   5920|
|2019|  1|   2809|   5520|
|2019|  6|   2809|   6000|
+----+---+-------+-------+

+-----+--------------------+-----------+------------+------+
|   id|              nombre|id_material|id_categoria|precio|
+-----+--------------------+-----------+------------+------+
| 2809| Ablación viñamarino|          5|           1|    40|
| 4077|      Tribu agonista|          5|           4|    45|
| 6189|    Bóiler ciudadana|          2|           5|    65|
| 9171|Cuatrillón partur...|          2|           2|    75|
|10902|  Xacena tetrasílabo|          3|           1|    45|
+-----+--------------------+-----------+------------+------+

+---+----------+
| id|    nombre|
+---+----------+
|  1|     Cuero|
|  2|    Gamuza|
|  3|      Goma|
|  4|Sintéticos|
|  5|      Tela|
+---+----------+

+---+----------------+
| id|          nombre|
+---+-

In [78]:
# prompt: Find the month with more sells

spark.sql("""
          SELECT anyo, mes, sum(importe) as importe
          from datamart_ventas
          group by anyo, mes
          order by importe desc
          """).show()


+----+---+-------+
|anyo|mes|importe|
+----+---+-------+
|2021|  1|1738965|
|2020|  7|1735585|
|2019| 10|1735560|
|2021|  8|1733365|
|2020|  8|1732230|
|2019|  7|1731160|
|2019|  5|1729250|
|2020|  1|1728950|
|2021|  7|1728310|
|2021|  5|1727315|
|2021|  3|1726540|
|2019|  1|1725765|
|2020|  5|1725690|
|2019| 12|1725650|
|2020| 10|1724070|
|2019|  8|1722155|
|2019|  3|1721030|
|2020|  3|1717350|
|2020| 12|1708645|
|2020|  6|1691180|
+----+---+-------+
only showing top 20 rows



Mostrar las ventas (total importe) por material y mes a lo largo del historico

In [85]:
spark.sql("""
          SELECT dv.anyo, dv.mes, dm.nombre as material, sum(dv.importe) as importe
          from datamart_ventas dv
          join datamart_productos dp on dp.id = dv.id_prod
          join datamart_materiales dm on dp.id_material = dm.id
          group by 1,2,3
          order by 1,2,3
          """).show()


+----+---+----------+-------+
|anyo|mes|  material|importe|
+----+---+----------+-------+
|2018| 12|Sintéticos|     30|
|2019|  1|     Cuero| 408565|
|2019|  1|    Gamuza| 643520|
|2019|  1|      Goma| 299225|
|2019|  1|Sintéticos|  65950|
|2019|  1|      Tela| 308505|
|2019|  2|     Cuero| 364520|
|2019|  2|    Gamuza| 584570|
|2019|  2|      Goma| 270560|
|2019|  2|Sintéticos|  57460|
|2019|  2|      Tela| 277155|
|2019|  3|     Cuero| 408970|
|2019|  3|    Gamuza| 644515|
|2019|  3|      Goma| 291860|
|2019|  3|Sintéticos|  68015|
|2019|  3|      Tela| 307670|
|2019|  4|     Cuero| 391600|
|2019|  4|    Gamuza| 628585|
|2019|  4|      Goma| 286035|
|2019|  4|Sintéticos|  63925|
+----+---+----------+-------+
only showing top 20 rows

