In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.types import LongType,DateType,StringType
from pyspark.sql.types import StructField, StructType, BooleanType, ArrayType, IntegerType, DoubleType, FloatType
from delta import *
from delta.tables import *
from hdfs import InsecureClient
from os import PathLike
import pandas as teste
from pyspark.sql.functions import *
from pyspark.sql.functions import max as sparkMax

# warehouse_location points to the default location for managed databases and tables
warehouse_location = 'hdfs://hdfs-nn:9000/project/uc2/bronze/warehouse'

# Criação da sessão spark
builder = SparkSession \
    .builder \
    .appName("Python Spark SQL Hive integration example") \
    .config("spark.sql.warehouse.dir", warehouse_location) \
    .config("hive.metastore.uris", "thrift://hive-metastore:9083") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.jars.packages", "io.delta:delta-core_2.12:1.0.0") \
    .enableHiveSupport() \

spark = spark = configure_spark_with_delta_pip(builder).getOrCreate()

In [2]:
# EXEMPLO PARA FAZER UPLOAD DO DATASET VEHICLE_INFORMATION
# COPY FILE TO BRONZE LAYER

#client = InsecureClient("http://hdfs-nn:9870", user="anonymous")

#from_path = "./Vehicle_Information.csv"
#to_path = "/project/uc2/bronze/datasets/Vehicle_Information.csv"

#client.delete(to_path)

#client.upload(to_path, from_path)

In [3]:
# Eliminação da Base de Dados caso exista
spark.sql(
    """
    DROP DATABASE IF EXISTS vehicle CASCADE
    """
)

DataFrame[]

In [4]:
# Criação da base de dados onde vão ser guardados todos os dados do bronze na localização project/uc2/bronze/warehouse
spark.sql(
    """
    CREATE DATABASE IF NOT EXISTS vehicle LOCATION 'hdfs://hdfs-nn:9000/project/uc2/bronze/warehouse'
    """
)

DataFrame[]

In [5]:
# Mostrar todas as bases de Dados existentes 
spark.sql(
    """
    SHOW DATABASES
    """
).show()

+--------------------+
|           namespace|
+--------------------+
|     accident_silver|
|           accidents|
|             default|
|                demo|
|                gold|
|                novo|
|            products|
|               sales|
|            severity|
|             vehicle|
|vehicle_informati...|
|      vehicle_silver|
+--------------------+



In [6]:
# Eliminar tabela caso exista
spark.sql(
    """
    DROP TABLE IF EXISTS vehicle.vehicle_deltalake_table
    """
)

# CRIAÇÃO DA TABELA BRONZE 
spark.sql(
    """
    CREATE EXTERNAL TABLE vehicle.vehicle_deltalake_table (
        Accident_Index STRING,
        Age_Band_of_Driver STRING,
        Age_of_Vehicle STRING,
        Driver_Home_Area_Type STRING,
        Driver_IMD_Decile STRING,
        Engine_Capacity_CC STRING,
        Hit_Object_in_Carriageway STRING,
        Hit_Object_off_Carriageway STRING,
        Journey_Purpose_of_Driver STRING,
        Junction_Location STRING,
        make STRING,
        model STRING,
        Propulsion_Code STRING,
        Sex_of_Driver STRING,
        Skidding_and_Overturning STRING,
        Towing_and_Articulation STRING,
        Vehicle_Leaving_Carriageway STRING,
        Vehicle_Location_Restricted_Lane STRING,
        Vehicle_Manoeuvre STRING,
        Vehicle_Reference STRING,
        Vehicle_Type STRING,
        Was_Vehicle_Left_Hand_Drive STRING,
        X1st_Point_of_Impact STRING,
        Year STRING
    )
    USING DELTA
    LOCATION 'hdfs://hdfs-nn:9000/project/uc2/bronze/warehouse/vehicle.db/vehicle_deltalake_table/'
    """
)

DataFrame[]

In [7]:
# Mostrar Tabelas da Base de Dados Vehicle
spark.sql(
    """
    SHOW TABLES FROM vehicle
    """
).show()

+--------+--------------------+-----------+
|database|           tableName|isTemporary|
+--------+--------------------+-----------+
| vehicle|vehicle_deltalake...|      false|
+--------+--------------------+-----------+



In [8]:
# Check data from table (check 1)
spark.sql(
    """
    SELECT *
    FROM vehicle.vehicle_deltalake_table
    """
).show()

+--------------+------------------+--------------+---------------------+-----------------+------------------+-------------------------+--------------------------+-------------------------+-----------------+----+-----+---------------+-------------+------------------------+-----------------------+---------------------------+--------------------------------+-----------------+-----------------+------------+---------------------------+--------------------+----+
|Accident_Index|Age_Band_of_Driver|Age_of_Vehicle|Driver_Home_Area_Type|Driver_IMD_Decile|Engine_Capacity_CC|Hit_Object_in_Carriageway|Hit_Object_off_Carriageway|Journey_Purpose_of_Driver|Junction_Location|make|model|Propulsion_Code|Sex_of_Driver|Skidding_and_Overturning|Towing_and_Articulation|Vehicle_Leaving_Carriageway|Vehicle_Location_Restricted_Lane|Vehicle_Manoeuvre|Vehicle_Reference|Vehicle_Type|Was_Vehicle_Left_Hand_Drive|X1st_Point_of_Impact|Year|
+--------------+------------------+--------------+---------------------+------

In [9]:
# Descrição dos formatos de dados que constituem a tabela criada 
spark.sql(
    """
    DESCRIBE FORMATTED vehicle.vehicle_deltalake_table
    """
).toPandas()

Unnamed: 0,col_name,data_type,comment
0,Accident_Index,string,
1,Age_Band_of_Driver,string,
2,Age_of_Vehicle,string,
3,Driver_Home_Area_Type,string,
4,Driver_IMD_Decile,string,
5,Engine_Capacity_CC,string,
6,Hit_Object_in_Carriageway,string,
7,Hit_Object_off_Carriageway,string,
8,Journey_Purpose_of_Driver,string,
9,Junction_Location,string,


In [10]:
# Let's put the files into HDFS

In [11]:
spark.sql(
    """
    SELECT *
    FROM vehicle.vehicle_deltalake_table
    """
).toPandas()

Unnamed: 0,Accident_Index,Age_Band_of_Driver,Age_of_Vehicle,Driver_Home_Area_Type,Driver_IMD_Decile,Engine_Capacity_CC,Hit_Object_in_Carriageway,Hit_Object_off_Carriageway,Journey_Purpose_of_Driver,Junction_Location,...,Skidding_and_Overturning,Towing_and_Articulation,Vehicle_Leaving_Carriageway,Vehicle_Location_Restricted_Lane,Vehicle_Manoeuvre,Vehicle_Reference,Vehicle_Type,Was_Vehicle_Left_Hand_Drive,X1st_Point_of_Impact,Year


In [12]:
#read hdfs file to dataframe
hdfs_path = "hdfs://hdfs-nn:9000/project/uc2/bronze/Datasets/Vehicle_Information.csv"

#define the schema for the dataframe
customSchema = StructType([
    StructField("Accident_Index", StringType(), True),        
    StructField("Age_Band_of_Driver", StringType(), True),
    StructField("Age_of_Vehicle", StringType(), True),
    StructField("Driver_Home_Area_Type", StringType(), True),
    StructField("Driver_IMD_Decile", StringType(), True),
    StructField("Engine_Capacity_CC", StringType(), True),
    StructField("Hit_Object_in_Carriageway", StringType(), True),
    StructField("Hit_Object_off_Carriageway", StringType(), True),
    StructField("Journey_Purpose_of_Driver", StringType(), True),
    StructField("Junction_Location", StringType(), True),
    StructField("make", StringType(), True),
    StructField("model", StringType(), True),
    StructField("Propulsion_Code", StringType(), True),
    StructField("Sex_of_Driver", StringType(), True),
    StructField("Skidding_and_Overturning", StringType(), True),        
    StructField("Towing_and_Articulation", StringType(), True),
    StructField("Vehicle_Leaving_Carriageway", StringType(), True),
    StructField("Vehicle_Location_Restricted_Lane", StringType(), True),
    StructField("Vehicle_Manoeuvre", StringType(), True),
    StructField("Vehicle_Reference", StringType(), True),
    StructField("Vehicle_Type", StringType(), True),
    StructField("Was_Vehicle_Left_Hand_Drive", StringType(), True),
    StructField("X1st_Point_of_Impact", StringType(), True),
    StructField("Year", StringType(), True)
])

vehicle = spark \
            .read\
            .option("delimiter",",")\
            .option("header","true")\
            .schema(customSchema) \
            .csv(hdfs_path)

In [13]:
vehicle.show()
vehicle.printSchema()

+--------------+--------------------+--------------+---------------------+-----------------+------------------+-------------------------+--------------------------+-------------------------+--------------------+----------------+--------------------+---------------+-------------+------------------------+-----------------------+---------------------------+--------------------------------+--------------------+-----------------+--------------------+---------------------------+--------------------+----+
|Accident_Index|  Age_Band_of_Driver|Age_of_Vehicle|Driver_Home_Area_Type|Driver_IMD_Decile|Engine_Capacity_CC|Hit_Object_in_Carriageway|Hit_Object_off_Carriageway|Journey_Purpose_of_Driver|   Junction_Location|            make|               model|Propulsion_Code|Sex_of_Driver|Skidding_and_Overturning|Towing_and_Articulation|Vehicle_Leaving_Carriageway|Vehicle_Location_Restricted_Lane|   Vehicle_Manoeuvre|Vehicle_Reference|        Vehicle_Type|Was_Vehicle_Left_Hand_Drive|X1st_Point_of_Impac

In [14]:
# Write df to hive deltalake_table
vehicle\
    .select("Accident_Index","Age_Band_of_Driver","Age_of_Vehicle","Driver_Home_Area_Type","Driver_IMD_Decile","Engine_Capacity_CC","Hit_Object_in_Carriageway","Hit_Object_off_Carriageway",
    "Journey_Purpose_of_Driver","Junction_Location","make","model","Propulsion_Code","Sex_of_Driver","Skidding_and_Overturning",
    "Towing_and_Articulation","Vehicle_Leaving_Carriageway","Vehicle_Location_Restricted_Lane","Vehicle_Manoeuvre","Vehicle_Reference","Vehicle_Type",
    "Was_Vehicle_Left_Hand_Drive","X1st_Point_of_Impact","Year") \
    .write \
    .mode("overwrite")\
    .format("delta")\
    .save("hdfs://hdfs-nn:9000/project/uc2/bronze/warehouse/vehicle.db/vehicle_deltalake_table")


In [15]:
vehicle.show()

+--------------+--------------------+--------------+---------------------+-----------------+------------------+-------------------------+--------------------------+-------------------------+--------------------+----------------+--------------------+---------------+-------------+------------------------+-----------------------+---------------------------+--------------------------------+--------------------+-----------------+--------------------+---------------------------+--------------------+----+
|Accident_Index|  Age_Band_of_Driver|Age_of_Vehicle|Driver_Home_Area_Type|Driver_IMD_Decile|Engine_Capacity_CC|Hit_Object_in_Carriageway|Hit_Object_off_Carriageway|Journey_Purpose_of_Driver|   Junction_Location|            make|               model|Propulsion_Code|Sex_of_Driver|Skidding_and_Overturning|Towing_and_Articulation|Vehicle_Leaving_Carriageway|Vehicle_Location_Restricted_Lane|   Vehicle_Manoeuvre|Vehicle_Reference|        Vehicle_Type|Was_Vehicle_Left_Hand_Drive|X1st_Point_of_Impac

In [16]:
# Now lets extract and transform data

In [17]:
vehicle_df = (spark.read.format('csv')
           .option('header', 'True')
           .option('delimiter', ",")
           .option("inferSchema", "true")
           .load('hdfs://hdfs-nn:9000/project/uc2/bronze/Datasets/Vehicle_Information.csv'))

In [18]:
# Verificar Dados
vehicle_df.printSchema()
#vehicle_df.toPandas()

root
 |-- Accident_Index: string (nullable = true)
 |-- Age_Band_of_Driver: string (nullable = true)
 |-- Age_of_Vehicle: string (nullable = true)
 |-- Driver_Home_Area_Type: string (nullable = true)
 |-- Driver_IMD_Decile: string (nullable = true)
 |-- Engine_Capacity_.CC.: string (nullable = true)
 |-- Hit_Object_in_Carriageway: string (nullable = true)
 |-- Hit_Object_off_Carriageway: string (nullable = true)
 |-- Journey_Purpose_of_Driver: string (nullable = true)
 |-- Junction_Location: string (nullable = true)
 |-- make: string (nullable = true)
 |-- model: string (nullable = true)
 |-- Propulsion_Code: string (nullable = true)
 |-- Sex_of_Driver: string (nullable = true)
 |-- Skidding_and_Overturning: string (nullable = true)
 |-- Towing_and_Articulation: string (nullable = true)
 |-- Vehicle_Leaving_Carriageway: string (nullable = true)
 |-- Vehicle_Location.Restricted_Lane: string (nullable = true)
 |-- Vehicle_Manoeuvre: string (nullable = true)
 |-- Vehicle_Reference: intege

In [19]:
# Transformações 
df_final = (vehicle_df
            # Transformações do tipo String - Integer
            .withColumn("Year", vehicle_df["Year"].cast('integer'))
            .withColumn("Age_of_Vehicle", vehicle_df["Age_of_Vehicle"].cast('integer'))
            
            # Renomear colunas
            .withColumnRenamed('make','Brand')
            .withColumnRenamed('Year','Accident_year')
            .withColumnRenamed('Engine_Capacity_.CC.','Engine_Capacity_CC')
            
            # Eliminar colunas e duplicados
            .drop("Vehicle_Location.Restricted_Lane","Driver_Home_Area_Type","Driver_IMD_Decile","Hit_Object_in_Carriageway","Hit_Object_off_Carriageway","Journey_Purpose_of_Driver","Junction_Location","model","Propulsion_Code","Sex_of_Driver","Skidding_and_Overturning","Towing_and_Articulation","Vehicle_Leaving_Carriageway","Vehicle_Location_Restricted_Lane","Vehicle_Manoeuvre","Vehicle_Reference","Was_Vehicle_Left_Hand_Drive","X1st_Point_of_Impact")
            .drop_duplicates()
            
            # Substituir
            .replace('109', 'Other Vehicle_Type', 'Vehicle_Type')
            .replace('108', 'Other Vehicle_Type', 'Vehicle_Type')
            .replace('106', 'Other Vehicle_Type', 'Vehicle_Type')
            
            
           )
#df_final.show()
df_final.printSchema()
df_final.toPandas()

root
 |-- Accident_Index: string (nullable = true)
 |-- Age_Band_of_Driver: string (nullable = true)
 |-- Age_of_Vehicle: integer (nullable = true)
 |-- Engine_Capacity_CC: string (nullable = true)
 |-- Brand: string (nullable = true)
 |-- Vehicle_Type: string (nullable = true)
 |-- Accident_year: integer (nullable = true)



Unnamed: 0,Accident_Index,Age_Band_of_Driver,Age_of_Vehicle,Engine_Capacity_CC,Brand,Vehicle_Type,Accident_year
0,200401BS00415,26 - 35,3.0,7300,VOLVO,Bus or coach (17 or more pass seats),2004
1,200401DM00320,36 - 45,,,SCANIA,Goods 7.5 tonnes mgw and over,2004
2,200401DM00595,26 - 35,1.0,8268,DENNIS,Bus or coach (17 or more pass seats),2004
3,200401DM00980,36 - 45,1.0,2685,MERCEDES,Other Vehicle_Type,2004
4,200401DM01163,26 - 35,,,FODEN,Goods 7.5 tonnes mgw and over,2004
...,...,...,...,...,...,...,...
2174955,201697UC00506,Over 75,3.0,1349,MAZDA,Car,2016
2174956,2016981100616,46 - 55,6.0,1242,FORD,Car,2016
2174957,2016982108916,46 - 55,18.0,1332,TOYOTA,Car,2016
2174958,2016982109016,36 - 45,16.0,1896,VOLKSWAGEN,Car,2016


In [20]:
# Guardar 
df_final\
    .write\
    .mode("overwrite")\
    .format("delta")\
    .option ("overwriteSchema", "true")\
    .save("hdfs://hdfs-nn:9000/project/uc2/bronze/warehouse/vehicle.db/vehicle_deltalake_table/")

In [21]:
df_final = spark.read.format("delta").load("hdfs://hdfs-nn:9000/project/uc2/bronze/warehouse/vehicle.db/vehicle_deltalake_table/")

# Verificar 
df_final.toPandas()

Unnamed: 0,Accident_Index,Age_Band_of_Driver,Age_of_Vehicle,Engine_Capacity_CC,Brand,Vehicle_Type,Accident_year
0,200401BS00495,26 - 35,27.0,123,PIAGGIO,Motorcycle 125cc and under,2004
1,200401CP00235,36 - 45,16.0,647,HONDA,Other Vehicle_Type,2004
2,200401CW10036,36 - 45,,,VOLKSWAGEN,Other Vehicle_Type,2004
3,200401CW10287,Data missing or out of range,13.0,2962,MERCEDES,Other Vehicle_Type,2004
4,200401DM00493,21 - 25,,1997,PEUGEOT,Other Vehicle_Type,2004
...,...,...,...,...,...,...,...
2174955,201697UC70407,21 - 25,2.0,1968,SEAT,Car,2016
2174956,201697UD04304,46 - 55,,,,Car,2016
2174957,2016983125816,26 - 35,2.0,102,PEUGEOT,Motorcycle 125cc and under,2016
2174958,2016984107016,26 - 35,6.0,1598,VAUXHALL,Car,2016


In [22]:
# Now lets load this into a Silver Table

In [23]:
vehicle_silver_df = spark.read.format("delta").load("hdfs://hdfs-nn:9000/project/uc2/bronze/warehouse/vehicle.db/vehicle_deltalake_table/")

In [24]:
# Criar uma view,chamada vehicle_silver_view, dos dados que foram guardados no bronze 
vehicle_silver_df.createOrReplaceTempView('vehicle_silver_view')

# Criar um parquet em caso de necessidade
vehicle_silver_df.repartition(2).write.mode('overwrite').save('hdfs://hdfs-nn:9000/project/uc2/bronze/warehouse/df_final.parquet')

In [25]:
# Eliminar Base de Dados vehicle_silver caso exista
spark.sql(
"""
    DROP DATABASE IF EXISTS vehicle_silver CASCADE
"""
)

DataFrame[]

In [26]:
# Criação da Base de Dados para Silver
spark.sql(
    """
    CREATE DATABASE vehicle_silver LOCATION 'hdfs://hdfs-nn:9000/project/uc2/silver/warehouse/vehicle_silver.db/'
    """
)

DataFrame[]

In [27]:
# Confirmar a criação da Base de Dados
spark.sql(
    """
    SHOW DATABASES
    """
).show()

+--------------------+
|           namespace|
+--------------------+
|     accident_silver|
|           accidents|
|             default|
|                demo|
|                gold|
|                novo|
|            products|
|               sales|
|            severity|
|             vehicle|
|vehicle_informati...|
|      vehicle_silver|
+--------------------+



In [28]:
# Eliminação da tabela caso exista
spark.sql(
    """
    DROP TABLE IF EXISTS vehicle_silver.vehicle_silver_deltalake_table
    """
)

DataFrame[]

In [29]:
# Confirmar a Eliminação da mesma
spark.sql(
    """
    SHOW TABLES
    """
).show()

+--------+-------------------+-----------+
|database|          tableName|isTemporary|
+--------+-------------------+-----------+
|        |vehicle_silver_view|       true|
+--------+-------------------+-----------+



In [30]:
# Criação da Tabela Silver 
spark.sql(
    """
    CREATE EXTERNAL TABLE vehicle_silver.vehicle_silver_deltalake_table (
    
        Accident_Index STRING,
        Age_Band_of_Driver STRING,
        Age_of_Vehicle INTEGER,
        Engine_Capacity_CC STRING,
        Brand STRING,
        Vehicle_Type STRING
    )
    USING DELTA
    PARTITIONED BY (
        Accident_year INT
    )

    LOCATION 'hdfs://hdfs-nn:9000/project/uc2/silver/warehouse/vehicle_silver.db/vehicle_silver_deltalake_table/'
    """
)

DataFrame[]

In [31]:
# Insert data saved in the view into the table just created
spark.sql(
    """
    INSERT INTO vehicle_silver.vehicle_silver_deltalake_table
    SELECT * FROM vehicle_silver_view ;

    """
)

DataFrame[]

In [32]:
spark.sql(
    """
    SELECT *
    FROM vehicle_silver.vehicle_silver_deltalake_table
    """
).toPandas() 

Unnamed: 0,Accident_Index,Age_Band_of_Driver,Age_of_Vehicle,Engine_Capacity_CC,Brand,Vehicle_Type,Accident_year
0,2016010000229,36 - 45,17.0,1199,VAUXHALL,Car,2016
1,2016010000332,46 - 55,,,,Car,2016
2,2016010000459,26 - 35,16.0,4200,TOYOTA,Car,2016
3,2016010000524,26 - 35,,125,SYM,Motorcycle over 125cc and up to 500cc,2016
4,2016010000728,36 - 45,1.0,1999,JAGUAR,Car,2016
...,...,...,...,...,...,...,...
2174955,200597UD21702,21 - 25,5.0,1199,VAUXHALL,Car,2005
2174956,2005983129005,21 - 25,8.0,1905,CITROEN,Car,2005
2174957,2005983139905,66 - 75,5.0,2902,KIA,Taxi/Private hire car,2005
2174958,2005983170505,56 - 65,1.0,1390,RENAULT,Car,2005


In [33]:
#test if everything is ok
teste = spark.read.format("delta").load("hdfs://hdfs-nn:9000/project/uc2/silver/warehouse/vehicle_silver.db/vehicle_silver_deltalake_table")

In [35]:
spark.stop()