# 1.-Lectura de Base De Datos

In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import warnings

In [2]:
spark = SparkSession.builder.config("spark.jars", "/home/jovyan/drivers/postgresql-42.2.18.jar") \
    .master("local[*]").appName("Engine").getOrCreate().newSession()

22/07/02 04:53:25 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


# 2.- Bronze Layer

## Extracción_Maintenance

In [3]:
df_db_maintenance = (spark.read.format("jdbc").option("url","jdbc:postgresql://postgres:5432/postgres")
         .option("dbtable","tbl_maintenance") \
         .option("user","airflow").option("password","airflow") \
         .option("driver","org.postgresql.Driver").load()
        )

In [4]:
(df_db_maintenance
 .withColumn("partition_id", F.spark_partition_id())
 .groupBy("partition_id")
 .agg(F.count("maintenance_id")).orderBy("partition_id").show()
)



+------------+---------------------+
|partition_id|count(maintenance_id)|
+------------+---------------------+
|           0|                 7944|
+------------+---------------------+





In [5]:
df_db_maintenance = df_db_maintenance.repartition(10)

In [6]:
(df_db_maintenance
 .withColumn("partition_id", F.spark_partition_id())
 .groupBy("partition_id")
 .agg(F.count("maintenance_id")).orderBy("partition_id").show()
)

+------------+---------------------+
|partition_id|count(maintenance_id)|
+------------+---------------------+
|           0|                  794|
|           1|                  795|
|           2|                  795|
|           3|                  795|
|           4|                  795|
|           5|                  794|
|           6|                  794|
|           7|                  794|
|           8|                  794|
|           9|                  794|
+------------+---------------------+



## Load_Maintenance

In [7]:
df_db_maintenance.write.parquet("/home/jovyan/datalake/bronze_layer/tbl_maintenance",mode="overwrite")

22/07/02 04:53:34 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
                                                                                

## Extracción_Operation_Maintenance

In [8]:
df_db_operations_maintenance = (spark.read.format("jdbc").option("url","jdbc:postgresql://postgres:5432/postgres")
         .option("dbtable","tbl_operations_maintenance") \
         .option("user","airflow").option("password","airflow") \
         .option("driver","org.postgresql.Driver").load()
        )

In [9]:
df_db_operations_maintenance.printSchema()

root
 |-- mtn_creation_date_id: string (nullable = true)
 |-- mtn_start_date_id: string (nullable = true)
 |-- mtn_end_date_id: string (nullable = true)
 |-- mtn_delivered_date_id: string (nullable = true)
 |-- mtn_creation_date: string (nullable = true)
 |-- mtn_start_date: string (nullable = true)
 |-- mtn_end_date: string (nullable = true)
 |-- mtn_delivered_date: string (nullable = true)
 |-- maintenance_id: integer (nullable = true)
 |-- status_id: integer (nullable = true)



## Load_Operation_Maintenance

In [14]:
df_db_operations_maintenance = df_db_operations_maintenance.withColumn('status_id_partition',F.col("status_id"))

In [15]:
(df_db_operations_maintenance
 .write.partitionBy("status_id_partition")
 .parquet("/home/jovyan/datalake/bronze_layer/tbl_operations_maintenance",mode='overwrite')
)

## Extract_type_maintenance

In [16]:
df_db_type_maintenance = (spark.read.format("jdbc").option("url","jdbc:postgresql://postgres:5432/postgres")
         .option("dbtable","cat_type_maintenance") \
         .option("user","airflow").option("password","airflow") \
         .option("driver","org.postgresql.Driver").load()
        )

## Load_type_maintenance

In [17]:
df_db_type_maintenance.write.parquet("/home/jovyan/datalake/bronze_layer/cat_type_maintenance",mode='overwrite')

## Extract_type_package

In [18]:
df_db_type_package = (spark.read.format("jdbc").option("url","jdbc:postgresql://postgres:5432/postgres")
         .option("dbtable","cat_type_package") \
         .option("user","airflow").option("password","airflow") \
         .option("driver","org.postgresql.Driver").load()
        )

## Load_type_package

In [19]:
df_db_type_package.write.parquet("/home/jovyan/datalake/bronze_layer/cat_type_package",mode='overwrite')

## Extract_type_status

In [20]:
df_db_type_status = (spark.read.format("jdbc").option("url","jdbc:postgresql://postgres:5432/postgres")
         .option("dbtable","cat_type_status") \
         .option("user","airflow").option("password","airflow") \
         .option("driver","org.postgresql.Driver").load()
        )

## Load_type_status

In [21]:
df_db_type_status.write.parquet("/home/jovyan/datalake/bronze_layer/cat_type_status",mode='overwrite')

# 3.-Silver Layer

## Extract

In [31]:
df_silver_maintenance = spark.read.parquet("/home/jovyan/datalake/bronze_layer/tbl_maintenance/*")
df_silver_operations_maintenace = (spark
                                   .read
                                   .parquet("/home/jovyan/datalake/bronze_layer/tbl_operations_maintenance/*")
                                  )
df_silver_type_maintenance = (spark
                              .read
                              .parquet("/home/jovyan/datalake/bronze_layer/cat_type_maintenance/*")
                             )
df_silver_type_package = (spark
                              .read
                              .parquet("/home/jovyan/datalake/bronze_layer/cat_type_package/*")
                             )
df_silver_type_status = (spark
                             .read
                             .parquet("/home/jovyan/datalake/bronze_layer/cat_type_status/*")
                        )


In [32]:
(df_silver_maintenance
 .withColumn("partition_id",F.spark_partition_id())
 .groupBy("partition_id")
 .agg(F.count("maintenance_id"))
 .orderBy("partition_id").show()
)

+------------+---------------------+
|partition_id|count(maintenance_id)|
+------------+---------------------+
|           0|                 1589|
|           1|                 1588|
|           2|                 1589|
|           3|                 1590|
|           4|                 1588|
+------------+---------------------+



## Transform

In [33]:
df_silver_operations_maintenace.printSchema()

root
 |-- mtn_creation_date_id: string (nullable = true)
 |-- mtn_start_date_id: string (nullable = true)
 |-- mtn_end_date_id: string (nullable = true)
 |-- mtn_delivered_date_id: string (nullable = true)
 |-- mtn_creation_date: string (nullable = true)
 |-- mtn_start_date: string (nullable = true)
 |-- mtn_end_date: string (nullable = true)
 |-- mtn_delivered_date: string (nullable = true)
 |-- maintenance_id: integer (nullable = true)
 |-- status_id: integer (nullable = true)



In [34]:
df_silver_maintenance = (df_silver_maintenance
                         .filter(df_silver_maintenance["price_maintenance"]!=0.0)
                        )

In [35]:
df_silver_maintenance.count()

4383

In [36]:
df_silver_complete = (
                     df_silver_maintenance
                        .join(df_silver_operations_maintenace,how='left', on='maintenance_id'
                             )
                     )

In [37]:
df_silver_complete = (
                        df_silver_complete
                         .join(df_silver_type_status,how='left',on='status_id')
                     )

In [39]:
df_silver_complete = (
                        df_silver_complete
                         .join(df_silver_type_package,how='left',on='package_id')
                     )

In [40]:
df_silver_complete = (
                        df_silver_complete
                         .join(df_silver_type_maintenance,how='left',on='type_id')
                     )

In [41]:
df_silver_complete.count()

4391

In [43]:
df_silver_complete.show(vertical=True)

-RECORD 0------------------------------------
 type_id               | 6                   
 package_id            | 200                 
 status_id             | 2                   
 maintenance_id        | 32199               
 stock_id              | 19656               
 estimate_id           | 0                   
 price_maintenance     | 1100.0              
 mtn_creation_date_id  | 2021031500          
 mtn_start_date_id     | 2021032015          
 mtn_end_date_id       | 2021032019          
 mtn_delivered_date_id | 2021032423          
 mtn_creation_date     | 2021-03-15 00:00:00 
 mtn_start_date        | 2021-03-20 15:00:00 
 mtn_end_date          | 2021-03-20 19:00:00 
 mtn_delivered_date    | 2021-03-24 18:50:41 
 status_name           | Delivered           
 date                  | 2019-09-04 18:00:00 
 package_name          | Paying Customer     
 date                  | 2019-09-04 17:00:00 
 type_name             | Basic               
 date                  | 2019-09-0

## Load

In [46]:
df_silver_complete.printSchema()

root
 |-- type_id: integer (nullable = true)
 |-- package_id: integer (nullable = true)
 |-- status_id: integer (nullable = true)
 |-- maintenance_id: integer (nullable = true)
 |-- stock_id: integer (nullable = true)
 |-- estimate_id: integer (nullable = true)
 |-- price_maintenance: double (nullable = true)
 |-- mtn_creation_date_id: string (nullable = true)
 |-- mtn_start_date_id: string (nullable = true)
 |-- mtn_end_date_id: string (nullable = true)
 |-- mtn_delivered_date_id: string (nullable = true)
 |-- mtn_creation_date: string (nullable = true)
 |-- mtn_start_date: string (nullable = true)
 |-- mtn_end_date: string (nullable = true)
 |-- mtn_delivered_date: string (nullable = true)
 |-- status_name: string (nullable = true)
 |-- date: string (nullable = true)
 |-- package_name: string (nullable = true)
 |-- date: string (nullable = true)
 |-- type_name: string (nullable = true)
 |-- date: string (nullable = true)



In [47]:
df_silver_complete = df_silver_complete.drop("date")

In [48]:
properties = {"user": "airflow","password": "airflow","driver": "org.postgresql.Driver"}

(df_silver_complete
 .write
 .jdbc(url="jdbc:postgresql://postgres:5432/postgres",table='example',mode='overwrite',properties=properties)
)

In [None]:
df_db_type_status = (spark.read.format("jdbc").option("url","jdbc:postgresql://postgres:5432/postgres")
         .option("dbtable","cat_type_status") \
         .option("user","airflow").option("password","airflow") \
         .option("driver","org.postgresql.Driver").load()
        )