In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import Row

# warehouse_location points to the default location for managed databases and tables
warehouse_location = 'hdfs://hdfs-nn:9000/warehouse'

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL Hive integration TABD project") \
    .config("spark.sql.warehouse.dir", warehouse_location) \
    .config("hive.metastore.uris", "thrift://hive-metastore:9083") \
    .enableHiveSupport() \
    .getOrCreate()

In [2]:
spark.sql(
    """
    SHOW DATABASES
    """
).show()

+------------+
|   namespace|
+------------+
|     default|
|project_tabd|
+------------+



In [3]:
spark.sql(
    """
    DROP DATABASE IF EXISTS project_tabd CASCADE
    """
)

DataFrame[]

In [5]:
# you can choose any location in HDFS, just be organized 
# Your data lake will grow with time and will become a swamp
spark.sql(
    """
    CREATE DATABASE project_tabd LOCATION 'hdfs://hdfs-nn:9000/warehouse/tabd.db/'
    """
)

DataFrame[]

In [3]:
spark.sql(
    """
    SHOW DATABASES
    """
).show()

+------------+
|   namespace|
+------------+
|     default|
|project_tabd|
+------------+



In [3]:
spark.sql(
    """
    SHOW TABLES FROM project_tabd
    """
).show()

+------------+--------------------+-----------+
|    database|           tableName|isTemporary|
+------------+--------------------+-----------+
|project_tabd|parquet_table_wea...|      false|
|project_tabd|parquet_table_wea...|      false|
+------------+--------------------+-----------+



In [4]:
spark.sql(
    """
    DROP TABLE IF EXISTS project_tabd.parquet_table_weather_gold
    """
)

spark.sql(
    """
    CREATE EXTERNAL TABLE project_tabd.parquet_table_weather_gold (
        date DATE,
        month INT,
        day INT,
        station_id VARCHAR(15),
        station_name VARCHAR(100),
        latitude DOUBLE,
        longitude DOUBLE,
        air_temperature_avg DOUBLE,
        air_temperature_min DOUBLE,
        air_temperature_max DOUBLE,
        global_radiation_total DOUBLE,
        rain_precipitation_qty DOUBLE,
        country VARCHAR(50),
        city VARCHAR(50),
        day_of_year INT,
        season VARCHAR(15)
        
               
    )
    STORED AS PARQUET
    PARTITIONED BY (
        year INT
    )
    LOCATION 'hdfs://hdfs-nn:9000/warehouse/tabd.db/parquet_table_weather_gold/'
    """
)

DataFrame[]

In [5]:
spark.sql(
    """
    DESCRIBE FORMATTED project_tabd.parquet_table_weather_gold
    """
).toPandas() 

Unnamed: 0,col_name,data_type,comment
0,date,date,
1,month,int,
2,day,int,
3,station_id,varchar(15),
4,station_name,varchar(100),
5,latitude,double,
6,longitude,double,
7,air_temperature_avg,double,
8,air_temperature_min,double,
9,air_temperature_max,double,


In [6]:
spark.sql(
    """
    SHOW TABLES FROM project_tabd
    """
).show()

+------------+--------------------+-----------+
|    database|           tableName|isTemporary|
+------------+--------------------+-----------+
|project_tabd|parquet_table_wea...|      false|
|project_tabd|parquet_table_wea...|      false|
+------------+--------------------+-----------+



In [7]:
# recover partitions is needed so that the Hive Metastore (Catalog)
# is updated. Otherwise Hive and the querying engines do not know
# that there are new parittions in the partitioned table.
spark.catalog.recoverPartitions("project_tabd.parquet_table_weather_gold")

spark.sql(
    """
    SELECT *
    FROM project_tabd.parquet_table_weather_gold
    """
).show()


+----------+-----+---+----------+--------------------+-----------+-----------+-------------------+-------------------+-------------------+----------------------+----------------------+--------+----------------+-----------+-------+----+
|      date|month|day|station_id|        station_name|   latitude|  longitude|air_temperature_avg|air_temperature_min|air_temperature_max|global_radiation_total|rain_precipitation_qty| country|            city|day_of_year| season|year|
+----------+-----+---+----------+--------------------+-----------+-----------+-------------------+-------------------+-------------------+----------------------+----------------------+--------+----------------+-----------+-------+----+
|2006-01-01|    1|  1|   1200545|Porto - Pedras Ru...|41.23350278|-8.68133333|               11.1|                6.9|               14.3|                6267.1|                   0.7|Portugal|           Porto|          1|Inverno|2006|
|2006-01-01|    1|  1|   1200548| Coimbra (Aeródromo)|  

In [9]:
spark.sql(
    """
    SELECT station_name, date,country,city, season
    FROM project_tabd.parquet_table_weather_gold

    """
).toPandas()

Unnamed: 0,station_name,date,country,city,season
0,Porto - Pedras Rubras,2006-01-01,Portugal,Porto,Inverno
1,Coimbra (Aeródromo),2006-01-01,Portugal,Coimbra,Inverno
2,Viana do Castelo - Chafé,2006-01-01,Portugal,Viana do Castelo,Inverno
3,Faro / Aeroporto,2006-01-01,Portugal,Faro,Inverno
4,Évora / Aeródromo,2006-01-01,Portugal,Évora,Inverno
...,...,...,...,...,...
138073,Guarda,2000-12-31,Portugal,Guarda,Inverno
138074,Aveiro / Universidade,2000-12-31,Portugal,Aveiro,Inverno
138075,Leiria / Aeródromo,2000-12-31,Portugal,Leiria,Inverno
138076,Santarém - Fonte Boa / Est. Zootécnica,2000-12-31,Portugal,Santarém,Inverno


In [12]:
# Let's look into HDFS

In [13]:
# Let's put the files into HDFS

In [10]:
spark.stop()