In [1]:
from os import PathLike
from hdfs import InsecureClient
from pyspark.sql import SparkSession
from pyspark.sql import Row
from delta import *
from pyspark.sql.functions import *
from pyspark.sql.types import LongType, StringType, StructField, StructType, BooleanType, ArrayType, IntegerType,FloatType

warehouse_location = 'hdfs://hdfs-nn:9870/TrabalhoPL'

builder = SparkSession \
    .builder \
    .master("local[2]") \
    .appName("Python Spark DataFrames and SQL") \
    .config("spark.sql.warehouse.dir", warehouse_location) \
    .config("hive.metastore.uris", "thrift://hive-metastore:9083") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.jars.packages", "io.delta:delta-core_2.12:1.0.0") \
    .enableHiveSupport() \

spark = configure_spark_with_delta_pip(builder).getOrCreate()

In [2]:
hdfs_path = "hdfs://hdfs-nn:9000/TrabalhoPL/bronze/spill-incidents.csv"

customSchema = StructType([
    StructField("Spill Number", StringType(), True),        
    StructField("Program Facility Name", StringType(), True),
    StructField("Street 1", StringType(), True),
    StructField("Street 2", StringType(), True),
    StructField("Locality", StringType(), True),
    StructField("Contry", StringType(), True),
    StructField("ZIP Code", StringType(), True),
    StructField("SWIS Code", IntegerType(), True), 
    StructField("DEC Region", IntegerType(), True),  
    StructField("Spill Date", StringType(), True),
    StructField("Received Date", StringType(), True), 
    StructField("Contributing Factor", StringType(), True),
    StructField("Waterbody", StringType(), True),
    StructField("Source", StringType(), True),
    StructField("Close Date", StringType(), True),
    StructField("Material Name", StringType(), True),
    StructField("Material Family", StringType(), True),
    StructField("Quantity", FloatType(), True),
    StructField("Units", StringType(), True),
    StructField("Recovered", FloatType(), True)
    
])

projeto_spill = spark \
            .read\
            .option("delimiter",",")\
            .option("header","true")\
            .schema(customSchema) \
            .csv(hdfs_path)
projeto_spill.show()

+------------+---------------------+--------------------+--------+----------------+-----------+--------+---------+----------+--------------------+--------------------+-------------------+----------+--------------------+--------------------+------------------+---------------+--------+-------+---------+
|Spill Number|Program Facility Name|            Street 1|Street 2|        Locality|     Contry|ZIP Code|SWIS Code|DEC Region|          Spill Date|       Received Date|Contributing Factor| Waterbody|              Source|          Close Date|     Material Name|Material Family|Quantity|  Units|Recovered|
+------------+---------------------+--------------------+--------+----------------+-----------+--------+---------+----------+--------------------+--------------------+-------------------+----------+--------------------+--------------------+------------------+---------------+--------+-------+---------+
|    0107132 |    MH 864        ...|  RT 119/MILLWOOD RD|    null|        ELMSFORD|Westches

In [3]:
from pyspark.sql.functions import when, col, concat, lit

replaced_projeto_spill = projeto_spill.withColumn(
    "Spill Date",
    when(
        (col("Spill Date").isNull() | (col("Spill Date") == None)), 
        "0000-00-00T00:00:00.000"
    ).otherwise(col("Spill Date")))

In [4]:
replaced_projeto_spill2 = replaced_projeto_spill.withColumn(
    "Received Date",
    when(
        (col("Received Date").isNull() | (col("Received Date") == None)), 
        "0000-00-00T00:00:00.000"
    ).otherwise(col("Received Date")))

In [5]:
replaced_projeto_spill3 = replaced_projeto_spill2.withColumn(
    "Close Date",
    when(
        (col("Close Date").isNull() | (col("Close Date") == None)), 
        "0000-00-00T00:00:00.000"
    ).otherwise(col("Close Date")))

In [6]:
replaced_projeto_spill4 = replaced_projeto_spill3.withColumn('Data_Derrame', split(replaced_projeto_spill3['Spill Date'], 'T').getItem(0)) \
                                                .withColumn('Data_Relatada', split(replaced_projeto_spill3['Received Date'], 'T').getItem(0)) \
                                                .withColumn('Data_Fim', split(replaced_projeto_spill3['Close Date'], 'T').getItem(0)) \
                                                .drop(col("Spill Date")) \
                                                .drop(col("Received Date")) \
                                                .drop(col("Close Date")) \
                                                .drop(col("Street 2")) \
                                                .drop(col("ZIP Code"))
replaced_projeto_spill4.toPandas()

Unnamed: 0,Spill Number,Program Facility Name,Street 1,Locality,Contry,SWIS Code,DEC Region,Contributing Factor,Waterbody,Source,Material Name,Material Family,Quantity,Units,Recovered,Data_Derrame,Data_Relatada,Data_Fim
0,0107132,MH 864,RT 119/MILLWOOD RD,ELMSFORD,Westchester,6000,3,Unknown,,Unknown,unknown material,Other,10.0,Gallons,0.0,2001-10-10,2001-10-10,2001-10-15
1,0405586,BOWRY BAY,WATER POLL CONTROL,QUEENS,Queens,4101,2,Other,EAST RIVER,Unknown,raw sewage,Other,0.0,Pounds,0.0,2004-08-21,2004-08-21,2004-09-17
2,0405586,BOWRY BAY,WATER POLL CONTROL,QUEENS,Queens,4101,2,Other,EAST RIVER,Unknown,raw sewage,Other,0.0,,0.0,2004-08-21,2004-08-21,2004-09-17
3,0204667,POLE 16091,GRACE AVE/BURKE AVE,BRONX,Bronx,301,2,Equipment Failure,,Commercial/Industrial,transformer oil,Petroleum,1.0,Gallons,0.0,2002-08-02,2002-08-02,2002-10-28
4,0210559,POLE ON,FERDALE LOMIS RD / RT 52,LIBERTY,Sullivan,5336,3,Traffic Accident,,Commercial/Industrial,transformer oil,Petroleum,6.0,Gallons,6.0,2003-01-20,2003-01-20,2003-01-22
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
499052,9904003,`-NYCT,5TH AND WEST 50 ST,MANHATTAN,New York,3101,2,Housekeeping,,Commercial Vehicle,diesel,Petroleum,20.0,Gallons,0.0,1999-07-06,1999-07-06,2009-03-03
499053,0304463,`-NYCT,35TH ST AND 11TH AVE,MANHATTAN,New York,3101,2,Unknown,,Unknown,unknown petroleum,Petroleum,0.0,Gallons,0.0,2003-07-28,2003-07-28,2014-01-08
499054,9213322,`-NYCT / 146 ST,721 LENOX AVE,MANHATTAN,New York,3101,2,Tank Overfill,,Commercial/Industrial,#2 fuel oil,Petroleum,200.0,Gallons,0.0,1993-03-02,1993-03-02,2003-02-10
499055,8909580,`-NYCT BUS DEPOT - S I,CASTLETON AVE @ RECTOR ST,STATEN ISLAND,Richmond,4301,2,Abandoned Drums,,Commercial/Industrial,unknown petroleum,Petroleum,0.0,,0.0,1990-01-05,1990-01-05,1990-01-05


In [7]:
replaced_projeto_spill4 = replaced_projeto_spill4.withColumn(
    "Street 1",
    when(
        (col("Street 1").isNull() | (col("Street 1") == None)), 
        "Desconhecida"
    ).otherwise(col("Street 1")))

In [8]:
replaced_projeto_spill4 = replaced_projeto_spill4.withColumn(
    "Locality",
    when(
        (col("Locality").isNull() | (col("Locality") == None)), 
        "Desconhecida"
    ).otherwise(col("Locality")))

In [9]:
replaced_projeto_spill4 = replaced_projeto_spill4.withColumn(
    "Waterbody",
    when(
        (col("Waterbody").isNull() | (col("Waterbody") == None)), 
        "Nenhuma"
    ).otherwise(col("Waterbody")))

In [10]:
replaced_projeto_spill4 = replaced_projeto_spill4.withColumn(
    "Units",
    when(
        (col("Units").isNull() | (col("Units") == None)), 
        "Desconhecida"
    ).otherwise(col("Units")))

In [11]:
replaced_projeto_spill5 = replaced_projeto_spill4.withColumn(
    "DEC Region",
    when(
        ((col("DEC Region") == 1) | (col("DEC Region") == 3) | (col("DEC Region") == 4) | (col("DEC Region") == 5) | (col("DEC Region") == 6) | (col("DEC Region") == 8)), 
        None
    ).otherwise(col("DEC Region")))

replaced_projeto_spill6 = replaced_projeto_spill5.na.drop()
replaced_projeto_spill6.toPandas()


Unnamed: 0,Spill Number,Program Facility Name,Street 1,Locality,Contry,SWIS Code,DEC Region,Contributing Factor,Waterbody,Source,Material Name,Material Family,Quantity,Units,Recovered,Data_Derrame,Data_Relatada,Data_Fim
0,0405586,BOWRY BAY,WATER POLL CONTROL,QUEENS,Queens,4101,2,Other,EAST RIVER,Unknown,raw sewage,Other,0.0,Pounds,0.0,2004-08-21,2004-08-21,2004-09-17
1,0405586,BOWRY BAY,WATER POLL CONTROL,QUEENS,Queens,4101,2,Other,EAST RIVER,Unknown,raw sewage,Other,0.0,Desconhecida,0.0,2004-08-21,2004-08-21,2004-09-17
2,0204667,POLE 16091,GRACE AVE/BURKE AVE,BRONX,Bronx,301,2,Equipment Failure,Nenhuma,Commercial/Industrial,transformer oil,Petroleum,1.0,Gallons,0.0,2002-08-02,2002-08-02,2002-10-28
3,0311484,PRIVATE RESIDENCE,6568 GLEN HAVEN RD,SCOTT,Cortland,1238,7,Equipment Failure,Nenhuma,Private Dwelling,#2 fuel oil,Petroleum,75.0,Gallons,0.0,2004-01-11,2004-01-11,2004-08-25
4,0104307,149TH RD,"183RD ST, 149TH AV& 149RD",QUEENS,Queens,4101,2,Abandoned Drums,Nenhuma,Unknown,unknown material,Other,0.0,Gallons,0.0,2001-07-23,2001-07-23,2001-08-01
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
168047,9904003,`-NYCT,5TH AND WEST 50 ST,MANHATTAN,New York,3101,2,Housekeeping,Nenhuma,Commercial Vehicle,diesel,Petroleum,20.0,Gallons,0.0,1999-07-06,1999-07-06,2009-03-03
168048,0304463,`-NYCT,35TH ST AND 11TH AVE,MANHATTAN,New York,3101,2,Unknown,Nenhuma,Unknown,unknown petroleum,Petroleum,0.0,Gallons,0.0,2003-07-28,2003-07-28,2014-01-08
168049,9213322,`-NYCT / 146 ST,721 LENOX AVE,MANHATTAN,New York,3101,2,Tank Overfill,Nenhuma,Commercial/Industrial,#2 fuel oil,Petroleum,200.0,Gallons,0.0,1993-03-02,1993-03-02,2003-02-10
168050,8909580,`-NYCT BUS DEPOT - S I,CASTLETON AVE @ RECTOR ST,STATEN ISLAND,Richmond,4301,2,Abandoned Drums,Nenhuma,Commercial/Industrial,unknown petroleum,Petroleum,0.0,Desconhecida,0.0,1990-01-05,1990-01-05,1990-01-05


In [12]:
replaced_projeto_spill7 = replaced_projeto_spill6.withColumn("Data_Derrame", to_date(col("Data_Derrame"), "yyyy-MM-dd")) \
                                               .withColumn("Data_Relatada", to_date(col("Data_Relatada"), "yyyy-MM-dd")) \
                                                .withColumn("Data_Fim", to_date(col("Data_Fim"), "yyyy-MM-dd"))

replaced_projeto_spill8 = replaced_projeto_spill7.withColumn('Ano', (split(replaced_projeto_spill7['Data_Derrame'], '-').getItem(0)).cast(IntegerType()))
replaced_projeto_spill8.show()


+------------+---------------------+--------------------+-------------+--------+---------+----------+-------------------+----------+--------------------+------------------+---------------+--------+------------+---------+------------+-------------+----------+----+
|Spill Number|Program Facility Name|            Street 1|     Locality|  Contry|SWIS Code|DEC Region|Contributing Factor| Waterbody|              Source|     Material Name|Material Family|Quantity|       Units|Recovered|Data_Derrame|Data_Relatada|  Data_Fim| Ano|
+------------+---------------------+--------------------+-------------+--------+---------+----------+-------------------+----------+--------------------+------------------+---------------+--------+------------+---------+------------+-------------+----------+----+
|    0405586 |   BOWRY BAY      ...|  WATER POLL CONTROL|       QUEENS|  Queens|     4101|         2|              Other|EAST RIVER|             Unknown|        raw sewage|          Other|     0.0|      Pound

In [14]:
replaced_projeto_spill8.printSchema()

root
 |-- Spill Number: string (nullable = true)
 |-- Program Facility Name: string (nullable = true)
 |-- Street 1: string (nullable = true)
 |-- Locality: string (nullable = true)
 |-- Contry: string (nullable = true)
 |-- SWIS Code: integer (nullable = true)
 |-- DEC Region: integer (nullable = true)
 |-- Contributing Factor: string (nullable = true)
 |-- Waterbody: string (nullable = true)
 |-- Source: string (nullable = true)
 |-- Material Name: string (nullable = true)
 |-- Material Family: string (nullable = true)
 |-- Quantity: float (nullable = true)
 |-- Units: string (nullable = true)
 |-- Recovered: float (nullable = true)
 |-- Data_Derrame: date (nullable = true)
 |-- Data_Relatada: date (nullable = true)
 |-- Data_Fim: date (nullable = true)
 |-- Ano: integer (nullable = true)



In [15]:
replaced_projeto_spill9 = replaced_projeto_spill8 \
    .withColumnRenamed("Spill Number","Spill_Number") \
    .withColumnRenamed("Program Facility Name","Program_Facility_Name") \
    .withColumnRenamed("Street 1","Street") \
    .withColumnRenamed("SWIS Code","SWIS_Code") \
    .withColumnRenamed("DEC Region","DEC_Region")\
    .withColumnRenamed("Contributing Factor","Contributing_Factor") \
    .withColumnRenamed("Material Name","Material_Name") \
    .withColumnRenamed("Material Family","Material_Family")

In [16]:
replaced_projeto_spill9.toPandas()

Unnamed: 0,Spill_Number,Program_Facility_Name,Street,Locality,Contry,SWIS_Code,DEC_Region,Contributing_Factor,Waterbody,Source,Material_Name,Material_Family,Quantity,Units,Recovered,Data_Derrame,Data_Relatada,Data_Fim,Ano
0,0405586,BOWRY BAY,WATER POLL CONTROL,QUEENS,Queens,4101,2,Other,EAST RIVER,Unknown,raw sewage,Other,0.0,Pounds,0.0,2004-08-21,2004-08-21,2004-09-17,2004.0
1,0405586,BOWRY BAY,WATER POLL CONTROL,QUEENS,Queens,4101,2,Other,EAST RIVER,Unknown,raw sewage,Other,0.0,Desconhecida,0.0,2004-08-21,2004-08-21,2004-09-17,2004.0
2,0204667,POLE 16091,GRACE AVE/BURKE AVE,BRONX,Bronx,301,2,Equipment Failure,Nenhuma,Commercial/Industrial,transformer oil,Petroleum,1.0,Gallons,0.0,2002-08-02,2002-08-02,2002-10-28,2002.0
3,0311484,PRIVATE RESIDENCE,6568 GLEN HAVEN RD,SCOTT,Cortland,1238,7,Equipment Failure,Nenhuma,Private Dwelling,#2 fuel oil,Petroleum,75.0,Gallons,0.0,2004-01-11,2004-01-11,2004-08-25,2004.0
4,0104307,149TH RD,"183RD ST, 149TH AV& 149RD",QUEENS,Queens,4101,2,Abandoned Drums,Nenhuma,Unknown,unknown material,Other,0.0,Gallons,0.0,2001-07-23,2001-07-23,2001-08-01,2001.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
168047,9904003,`-NYCT,5TH AND WEST 50 ST,MANHATTAN,New York,3101,2,Housekeeping,Nenhuma,Commercial Vehicle,diesel,Petroleum,20.0,Gallons,0.0,1999-07-06,1999-07-06,2009-03-03,1999.0
168048,0304463,`-NYCT,35TH ST AND 11TH AVE,MANHATTAN,New York,3101,2,Unknown,Nenhuma,Unknown,unknown petroleum,Petroleum,0.0,Gallons,0.0,2003-07-28,2003-07-28,2014-01-08,2003.0
168049,9213322,`-NYCT / 146 ST,721 LENOX AVE,MANHATTAN,New York,3101,2,Tank Overfill,Nenhuma,Commercial/Industrial,#2 fuel oil,Petroleum,200.0,Gallons,0.0,1993-03-02,1993-03-02,2003-02-10,1993.0
168050,8909580,`-NYCT BUS DEPOT - S I,CASTLETON AVE @ RECTOR ST,STATEN ISLAND,Richmond,4301,2,Abandoned Drums,Nenhuma,Commercial/Industrial,unknown petroleum,Petroleum,0.0,Desconhecida,0.0,1990-01-05,1990-01-05,1990-01-05,1990.0


In [17]:
spark.sql(
    """
    DROP TABLE IF EXISTS Projeto.Tabela_Petroleo
    """
)

spark.sql(
    """
    CREATE EXTERNAL TABLE Projeto.Tabela_Petroleo (
        Spill_Number VARCHAR(50),
        Program_Facility_Name VARCHAR(500), 
        Street VARCHAR(500),
        Locality VARCHAR(50),
        Contry VARCHAR(50),
        SWIS_Code int,
        DEC_Region int,
        Contributing_Factor VARCHAR(500),
        Waterbody VARCHAR(50),
        Source VARCHAR(50),
        Material_Name VARCHAR(500),
        Material_Family VARCHAR(500),
        Quantity float,
        Units VARCHAR(100),
        Recovered float,
        Data_Derrame date,
        Data_Relatada date,
        Data_Fim date

    )
       USING DELTA
   
   PARTITIONED BY (
        Ano INT

    )
    LOCATION 'hdfs://hdfs-nn:9000/TrabalhoPL/silver/Projeto.db/Tabela_Petroleo'
    """
)

DataFrame[]

In [18]:
#write df to hive deltalake_table
replaced_projeto_spill9 \
    .select("Spill_Number","Program_Facility_Name","Street","Locality","Contry","SWIS_Code",
            "DEC_Region","Contributing_Factor","Waterbody", "Source", "Material_Name", "Material_Family", 
            "Quantity", "Units", "Recovered", "Data_Derrame", "Data_Relatada", "Data_Fim", "Ano") \
    .write \
    .mode("overwrite") \
    .partitionBy("Ano") \
    .format("delta") \
    .save("hdfs://hdfs-nn:9000/TrabalhoPL/silver/Projeto.db/Tabela_Petroleo")
from pyspark.sql.types import *

In [19]:
spark.sql("USE Projeto")
spark.sql("SHOW tables").show()

+---------+-----------------+-----------+
|namespace|        tableName|isTemporary|
+---------+-----------------+-----------+
|  projeto|             agua|      false|
|  projeto|      tabela_agua|      false|
|  projeto|        tabela_ar|      false|
|  projeto|  tabela_petroleo|      false|
|  projeto|tabela_reciclagem|      false|
+---------+-----------------+-----------+



In [20]:
spark.table("Projeto.Tabela_Petroleo").show()

+------------+---------------------+--------------------+----------------+--------+---------+----------+-------------------+--------------------+--------------------+--------------------+------------------+--------+------------+---------+------------+-------------+----------+----+
|Spill_Number|Program_Facility_Name|              Street|        Locality|  Contry|SWIS_Code|DEC_Region|Contributing_Factor|           Waterbody|              Source|       Material_Name|   Material_Family|Quantity|       Units|Recovered|Data_Derrame|Data_Relatada|  Data_Fim| Ano|
+------------+---------------------+--------------------+----------------+--------+---------+----------+-------------------+--------------------+--------------------+--------------------+------------------+--------+------------+---------+------------+-------------+----------+----+
|    9506819 | 1 BEEKMAN PLACE  ...|     1 BEEKMAN PLACE|       MANHATTAN|New York|     3101|         2|            Unknown|             Nenhuma|Instituti